198 lines
8.8 KiB
Python
198 lines
8.8 KiB
Python
import re
|
|
import sys
|
|
import time
|
|
import html
|
|
import random
|
|
import asyncio
|
|
import logging
|
|
import yaml
|
|
import feedparser
|
|
from datetime import datetime, timedelta
|
|
from telethon import TelegramClient
|
|
from youtube_dl import YoutubeDL
|
|
from youtube_dl.extractor import youtube
|
|
from aiohttp import ClientSession
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
|
|
loadmode = False
|
|
if len(sys.argv) > 1:
|
|
loadmode = sys.argv[1] == 'loadmode'
|
|
|
|
config = yaml.safe_load(open('config.yaml'))
|
|
api_id = config['telegram']['api_id']
|
|
api_hash = config['telegram']['api_hash']
|
|
bot_token = config['telegram'].get('bot_token')
|
|
storage_chat_id = storage_message_id = None
|
|
if config.get('storage'):
|
|
storage_chat_id = config['storage'].get('storage_chat_id')
|
|
storage_message_id = config['storage'].get('storage_message_id')
|
|
notify_chat = config['config']['notify_chat']
|
|
wait_seconds = config['config']['wait_seconds']
|
|
time_offset = config['config'].get('time_offset', '00:00')
|
|
time_offset_neg = time_offset.startswith('-')
|
|
if time_offset_neg or time_offset.startswith('+'):
|
|
time_offset = time_offset[1:]
|
|
hours, minutes = time_offset.split(':')
|
|
time_offset_td = timedelta(hours=int(hours), minutes=int(minutes))
|
|
channels = config['config']['channels']
|
|
invidious_instances = config['config'].get('invidious_instances', [])
|
|
|
|
live_regex = re.compile(r'error: (?:ytnotifier:([0-9]+) )?(this live event will begin|premieres) in (.+)', re.I)
|
|
strip_date = re.compile(r' \d{4}-\d{2}-\d{2} \d{2}:\d{2}$')
|
|
ytdl = YoutubeDL({'skip_download': True, 'no_color': True})
|
|
ytdl.add_default_info_extractors()
|
|
client = TelegramClient('ytnotifier', api_id, api_hash)
|
|
client.parse_mode = 'html'
|
|
seen_videos = set()
|
|
seen_videos_lock = asyncio.Lock()
|
|
tmp_handled_videos = set()
|
|
async def save_seen_videos():
|
|
async with seen_videos_lock:
|
|
with open('ytnotifier.data', 'w') as file:
|
|
file.write('0\n')
|
|
for i in seen_videos:
|
|
file.write(f'{i}\n')
|
|
if storage_chat_id and storage_message_id:
|
|
await client.edit_message(storage_chat_id, storage_message_id, file='ytnotifier.data')
|
|
|
|
youtube._try_get = _try_get = youtube.try_get
|
|
def traverse_dict(src):
|
|
for (key, value) in src.items():
|
|
if key == 'scheduledStartTime':
|
|
return value
|
|
if isinstance(value, dict):
|
|
if value := traverse_dict(value):
|
|
return value
|
|
return None
|
|
|
|
def try_get(src, getter, expected_type=None):
|
|
if reason := src.get('reason'):
|
|
if isinstance(reason, str) and (reason.startswith('This live event will begin in ') or reason.startswith('Premieres in ')):
|
|
t = _try_get(src, traverse_dict, str)
|
|
if t:
|
|
src['reason'] = f'ytnotifier:{t} {reason}'
|
|
return _try_get(src, getter, expected_type)
|
|
youtube.try_get = try_get
|
|
|
|
async def _handle_video(video_id, video_title):
|
|
last_was_few_moments = False
|
|
too_many_attempts_count = 1
|
|
video_url = f'https://www.youtube.com/watch?v={video_id}'
|
|
notify_text = 'New video'
|
|
while True:
|
|
try:
|
|
video_data = await client.loop.run_in_executor(None, ytdl.extract_info, video_url)
|
|
except BaseException as e:
|
|
wait_time = 30
|
|
message = str(e)
|
|
if '429' in message or 'too many' in message.lower():
|
|
wait_time = too_many_attempts_count * 60 * 60
|
|
too_many_attempts_count += 1
|
|
elif match := live_regex.match(message.rstrip('.')):
|
|
end_schedule_time = match.group(1) or 0
|
|
is_premiere = match.group(2).lower() == 'premieres'
|
|
if is_premiere:
|
|
notify_text = 'Premiere started'
|
|
else:
|
|
notify_text = 'Live event started'
|
|
human_end_schedule_time = match.group(3)
|
|
if end_schedule_time := int(end_schedule_time):
|
|
human_end_schedule_time += ' ('
|
|
if time_offset_neg:
|
|
human_end_schedule_time += str(datetime.fromtimestamp(end_schedule_time) - time_offset_td)
|
|
else:
|
|
human_end_schedule_time += str(datetime.fromtimestamp(end_schedule_time) + time_offset_td)
|
|
human_end_schedule_time += f' {"-" if time_offset_neg else "+"}{time_offset})'
|
|
tmp_wait_time = end_schedule_time - time.time()
|
|
if tmp_wait_time > wait_time:
|
|
wait_time = tmp_wait_time
|
|
await client.send_message(notify_chat, f'<b>{"Premiere" if is_premiere else "Live event"} starting in {human_end_schedule_time}:</b> <a href="{video_url}">{html.escape(video_title)}</a>')
|
|
elif not last_was_few_moments:
|
|
await client.send_message(notify_chat, f'<b>{"Premiere" if is_premiere else "Live event"} starting in {human_end_schedule_time}:</b> <a href="{video_url}">{html.escape(video_title)}</a>')
|
|
elif not last_was_few_moments:
|
|
await client.send_message(notify_chat, f'<b>{"Premiere" if is_premiere else "Live event"} starting in {human_end_schedule_time}:</b> <a href="{video_url}">{html.escape(video_title)}</a>')
|
|
last_was_few_moments = 'moment' in human_end_schedule_time.lower()
|
|
await asyncio.sleep(wait_time)
|
|
else:
|
|
if video_data.get('is_live'):
|
|
notify_text = 'Live event started'
|
|
break
|
|
if tmp_video_title := video_data.get('title'):
|
|
video_title = strip_date.sub('', tmp_video_title)
|
|
await client.send_message(notify_chat, f'<b>{notify_text}:</b> <a href="{video_url}">{html.escape(video_title)}</a>')
|
|
async with seen_videos_lock:
|
|
seen_videos.add(video_id)
|
|
|
|
async def handle_video(video_id, video_title):
|
|
try:
|
|
while True:
|
|
try:
|
|
await _handle_video(video_id, video_title)
|
|
except BaseException:
|
|
logging.exception('Exception raised while notifying %s (%s)', video_id, video_title)
|
|
else:
|
|
await save_seen_videos()
|
|
break
|
|
finally:
|
|
tmp_handled_videos.discard(video_id)
|
|
|
|
async def get_video_list(session, channel_id):
|
|
shuffled_instances = invidious_instances.copy()
|
|
random.shuffle(shuffled_instances)
|
|
for i in shuffled_instances:
|
|
try:
|
|
async with session.get(f'{i}/api/v1/channels/{channel_id}/latest?fields=title,videoId&a={time.time()}', headers={'Cache-Control': 'no-store, max-age=0'}) as resp:
|
|
if resp.status != 200:
|
|
logging.error('Invidious instance %s returned %s', i, str(resp.status))
|
|
continue
|
|
return list(map(lambda i: (i['videoId'], i['title']), await resp.json()))
|
|
except BaseException:
|
|
logging.exception('Invidious instance %s raised exception', i)
|
|
async with session.get(f'https://www.youtube.com/feeds/videos.xml?channel_id={channel_id}&a={time.time()}', headers={'Cache-Control': 'no-store, max-age=0'}) as resp:
|
|
d = feedparser.parse(await resp.text())
|
|
return list(map(lambda i: (i['yt_videoid'], i['title']), d['entries']))
|
|
|
|
async def main():
|
|
await client.start(bot_token=bot_token)
|
|
if storage_chat_id and storage_message_id:
|
|
try:
|
|
m = await client.get_messages(storage_chat_id, ids=storage_message_id)
|
|
await m.download_media('ytnotifier.data')
|
|
except BaseException:
|
|
logging.exception('Exception raised when downloading ytnotifier.data')
|
|
try:
|
|
with open('ytnotifier.data') as file:
|
|
version = file.readline().strip()
|
|
if version != '0':
|
|
logging.error('ytnotifier.data has incompatible version %s', version)
|
|
else:
|
|
while True:
|
|
video_id = file.readline().strip()
|
|
if not video_id:
|
|
break
|
|
seen_videos.add(video_id)
|
|
except BaseException:
|
|
logging.exception('Exception raised when parsing ytnotifier.data')
|
|
|
|
async with ClientSession() as session:
|
|
while True:
|
|
for i in channels:
|
|
logging.info('Checking %s', i)
|
|
for video_id, video_title in await get_video_list(session, i):
|
|
if video_id not in seen_videos and video_id not in tmp_handled_videos:
|
|
tmp_handled_videos.add(video_id)
|
|
if loadmode:
|
|
seen_videos.add(video_id)
|
|
else:
|
|
logging.info('Handling %s', video_id)
|
|
asyncio.create_task(handle_video(video_id, video_title))
|
|
await asyncio.sleep(random.randint(1, 10))
|
|
if loadmode:
|
|
await save_seen_videos()
|
|
break
|
|
await asyncio.sleep(wait_seconds)
|
|
|
|
if __name__ == '__main__':
|
|
client.loop.run_until_complete(main())
|