import re import sys import time import html import random import asyncio import logging import yaml import feedparser from datetime import datetime, timedelta from telethon import TelegramClient from youtube_dl import YoutubeDL from youtube_dl.extractor import youtube from aiohttp import ClientSession logging.basicConfig(level=logging.INFO) loadmode = False if len(sys.argv) > 1: loadmode = sys.argv[1] == 'loadmode' config = yaml.safe_load(open('config.yaml')) api_id = config['telegram']['api_id'] api_hash = config['telegram']['api_hash'] bot_token = config['telegram'].get('bot_token') storage_chat_id = storage_message_id = None if config.get('storage'): storage_chat_id = config['storage'].get('storage_chat_id') storage_message_id = config['storage'].get('storage_message_id') notify_chat = config['config']['notify_chat'] wait_seconds = config['config']['wait_seconds'] time_offset = config['config'].get('time_offset', '00:00') time_offset_neg = time_offset.startswith('-') if time_offset_neg or time_offset.startswith('+'): time_offset = time_offset[1:] hours, minutes = time_offset.split(':') time_offset_td = timedelta(hours=int(hours), minutes=int(minutes)) channels = config['config']['channels'] invidious_instances = config['config'].get('invidious_instances', []) live_regex = re.compile(r'error: (?:ytnotifier:([0-9]+) )?(this live event will begin|premieres) in (.+)', re.I) strip_date = re.compile(r' \d{4}-\d{2}-\d{2} \d{2}:\d{2}$') ytdl = YoutubeDL({'skip_download': True, 'no_color': True}) ytdl.add_default_info_extractors() client = TelegramClient('ytnotifier', api_id, api_hash) client.parse_mode = 'html' seen_videos = set() seen_videos_lock = asyncio.Lock() tmp_handled_videos = set() async def save_seen_videos(): async with seen_videos_lock: with open('ytnotifier.data', 'w') as file: file.write('0\n') for i in seen_videos: file.write(f'{i}\n') if storage_chat_id and storage_message_id: await client.edit_message(storage_chat_id, storage_message_id, file='ytnotifier.data') youtube._try_get = _try_get = youtube.try_get def traverse_dict(src): for (key, value) in src.items(): if key == 'scheduledStartTime': return value if isinstance(value, dict): if value := traverse_dict(value): return value return None def try_get(src, getter, expected_type=None): if reason := src.get('reason'): if isinstance(reason, str) and (reason.startswith('This live event will begin in ') or reason.startswith('Premieres in ')): t = _try_get(src, traverse_dict, str) if t: src['reason'] = f'ytnotifier:{t} {reason}' return _try_get(src, getter, expected_type) youtube.try_get = try_get async def _handle_video(video_id, video_title): last_was_few_moments = False too_many_attempts_count = 1 video_url = f'https://www.youtube.com/watch?v={video_id}' notify_text = 'New video' while True: try: video_data = await client.loop.run_in_executor(None, ytdl.extract_info, video_url) except BaseException as e: wait_time = 30 message = str(e) if '429' in message or 'too many' in message.lower(): wait_time = too_many_attempts_count * 60 * 60 too_many_attempts_count += 1 elif match := live_regex.match(message.rstrip('.')): end_schedule_time = match.group(1) or 0 is_premiere = match.group(2).lower() == 'premieres' if is_premiere: notify_text = 'Premiere started' else: notify_text = 'Live event started' human_end_schedule_time = match.group(3) if end_schedule_time := int(end_schedule_time): human_end_schedule_time += ' (' if time_offset_neg: human_end_schedule_time += str(datetime.fromtimestamp(end_schedule_time) - time_offset_td) else: human_end_schedule_time += str(datetime.fromtimestamp(end_schedule_time) + time_offset_td) human_end_schedule_time += f' {"-" if time_offset_neg else "+"}{time_offset})' tmp_wait_time = end_schedule_time - time.time() if tmp_wait_time > wait_time: wait_time = tmp_wait_time await client.send_message(notify_chat, f'{"Premiere" if is_premiere else "Live event"} starting in {human_end_schedule_time}: {html.escape(video_title)}') elif not last_was_few_moments: await client.send_message(notify_chat, f'{"Premiere" if is_premiere else "Live event"} starting in {human_end_schedule_time}: {html.escape(video_title)}') elif not last_was_few_moments: await client.send_message(notify_chat, f'{"Premiere" if is_premiere else "Live event"} starting in {human_end_schedule_time}: {html.escape(video_title)}') last_was_few_moments = 'moment' in human_end_schedule_time.lower() await asyncio.sleep(wait_time) else: if video_data.get('is_live'): notify_text = 'Live event started' break if tmp_video_title := video_data.get('title'): video_title = strip_date.sub('', tmp_video_title) await client.send_message(notify_chat, f'{notify_text}: {html.escape(video_title)}') async with seen_videos_lock: seen_videos.add(video_id) async def handle_video(video_id, video_title): try: while True: try: await _handle_video(video_id, video_title) except BaseException: logging.exception('Exception raised while notifying %s (%s)', video_id, video_title) else: await save_seen_videos() break finally: tmp_handled_videos.discard(video_id) async def get_video_list(session, channel_id): shuffled_instances = invidious_instances.copy() random.shuffle(shuffled_instances) for i in shuffled_instances: try: async with session.get(f'{i}/api/v1/channels/{channel_id}/latest?fields=title,videoId&a={time.time()}', headers={'Cache-Control': 'no-store, max-age=0'}) as resp: if resp.status != 200: logging.error('Invidious instance %s returned %s', i, str(resp.status)) continue return list(map(lambda i: (i['videoId'], i['title']), await resp.json())) except BaseException: logging.exception('Invidious instance %s raised exception', i) async with session.get(f'https://www.youtube.com/feeds/videos.xml?channel_id={channel_id}&a={time.time()}', headers={'Cache-Control': 'no-store, max-age=0'}) as resp: d = feedparser.parse(await resp.text()) return list(map(lambda i: (i['yt_videoid'], i['title']), d['entries'])) async def main(): await client.start(bot_token=bot_token) if storage_chat_id and storage_message_id: try: m = await client.get_messages(storage_chat_id, ids=storage_message_id) await m.download_media('ytnotifier.data') except BaseException: logging.exception('Exception raised when downloading ytnotifier.data') try: with open('ytnotifier.data') as file: version = file.readline().strip() if version != '0': logging.error('ytnotifier.data has incompatible version %s', version) else: while True: video_id = file.readline().strip() if not video_id: break seen_videos.add(video_id) except BaseException: logging.exception('Exception raised when parsing ytnotifier.data') async with ClientSession() as session: while True: for i in channels: logging.info('Checking %s', i) for video_id, video_title in await get_video_list(session, i): if video_id not in seen_videos and video_id not in tmp_handled_videos: tmp_handled_videos.add(video_id) if loadmode: seen_videos.add(video_id) else: logging.info('Handling %s', video_id) asyncio.create_task(handle_video(video_id, video_title)) await asyncio.sleep(random.randint(1, 10)) if loadmode: await save_seen_videos() break await asyncio.sleep(wait_seconds) if __name__ == '__main__': client.loop.run_until_complete(main())