ytnotifier/ytnotifier.py

185 lines
8.0 KiB
Python

import re
import sys
import time
import html
import random
import asyncio
import logging
import yaml
import feedparser
from telethon import TelegramClient
from youtube_dl import YoutubeDL
from youtube_dl.extractor import youtube
from aiohttp import ClientSession
logging.basicConfig(level=logging.INFO)
loadmode = False
if len(sys.argv) > 1:
loadmode = sys.argv[1] == 'loadmode'
config = yaml.safe_load(open('config.yaml'))
api_id = config['telegram']['api_id']
api_hash = config['telegram']['api_hash']
bot_token = config['telegram'].get('bot_token')
storage_chat_id = storage_message_id = None
if config.get('storage'):
storage_chat_id = config['storage'].get('storage_chat_id')
storage_message_id = config['storage'].get('storage_message_id')
notify_chat = config['config']['notify_chat']
wait_seconds = config['config']['wait_seconds']
channels = config['config']['channels']
invidious_instances = config['config'].get('invidious_instances', [])
live_regex = re.compile(r'error: (?:ytnotifier:([0-9]+) )?(this live event will begin|premieres) in (.+)', re.I)
strip_date = re.compile(r' \d{4}-\d{2}-\d{2} \d{2}:\d{2}$')
ytdl = YoutubeDL({'skip_download': True, 'no_color': True})
ytdl.add_default_info_extractors()
client = TelegramClient('ytnotifier', api_id, api_hash)
client.parse_mode = 'html'
seen_videos = set()
seen_videos_lock = asyncio.Lock()
tmp_handled_videos = set()
async def save_seen_videos():
async with seen_videos_lock:
with open('ytnotifier.data', 'w') as file:
file.write('0\n')
for i in seen_videos:
file.write(f'{i}\n')
if storage_chat_id and storage_message_id:
await client.edit_message(storage_chat_id, storage_message_id, file='ytnotifier.data')
youtube._try_get = _try_get = youtube.try_get
def traverse_dict(src):
for (key, value) in src.items():
if key == 'scheduledStartTime':
return value
if isinstance(value, dict):
if value := traverse_dict(value):
return value
return None
def try_get(src, getter, expected_type=None):
if reason := src.get('reason'):
if isinstance(reason, str) and (reason.startswith('This live event will begin in ') or reason.startswith('Premieres in ')):
t = _try_get(src, traverse_dict, str)
if t:
src['reason'] = f'ytnotifier:{t} {reason}'
return _try_get(src, getter, expected_type)
youtube.try_get = try_get
async def _handle_video(video_id, video_title):
last_was_few_moments = False
too_many_attempts_count = 1
video_url = f'https://www.youtube.com/watch?v={video_id}'
notify_text = 'New video'
while True:
try:
video_data = await client.loop.run_in_executor(None, ytdl.extract_info, video_url)
except BaseException as e:
wait_time = 30
message = str(e)
if '429' in message or 'too many' in message.lower():
wait_time = too_many_attempts_count * 60 * 60
too_many_attempts_count += 1
elif match := live_regex.match(message.rstrip('.')):
end_schedule_time = match.group(1) or 0
is_premiere = match.group(2).lower() == 'premieres'
if is_premiere:
notify_text = 'Premiere started'
else:
notify_text = 'Live event started'
human_end_schedule_time = match.group(3)
if end_schedule_time := int(end_schedule_time):
tmp_wait_time = end_schedule_time - time.time()
if tmp_wait_time > wait_time:
wait_time = tmp_wait_time
await client.send_message(notify_chat, f'<b>{"Premiere" if is_premiere else "Live event"} starting in {human_end_schedule_time}:</b> <a href="{video_url}">{html.escape(video_title)}</a>')
elif not last_was_few_moments:
await client.send_message(notify_chat, f'<b>{"Premiere" if is_premiere else "Live event"} starting in {human_end_schedule_time}:</b> <a href="{video_url}">{html.escape(video_title)}</a>')
elif not last_was_few_moments:
await client.send_message(notify_chat, f'<b>{"Premiere" if is_premiere else "Live event"} starting in {human_end_schedule_time}:</b> <a href="{video_url}">{html.escape(video_title)}</a>')
last_was_few_moments = 'moment' in human_end_schedule_time.lower()
await asyncio.sleep(wait_time)
else:
if video_data.get('is_live'):
notify_text = 'Live event started'
break
if tmp_video_title := video_data.get('title'):
video_title = strip_date.sub('', tmp_video_title)
await client.send_message(notify_chat, f'<b>{notify_text}:</b> <a href="{video_url}">{html.escape(video_title)}</a>')
async with seen_videos_lock:
seen_videos.add(video_id)
async def handle_video(video_id, video_title):
try:
while True:
try:
await _handle_video(video_id, video_title)
except BaseException:
logging.exception('Exception raised while notifying %s (%s)', video_id, video_title)
else:
await save_seen_videos()
break
finally:
tmp_handled_videos.discard(video_id)
async def get_video_list(session, channel_id):
shuffled_instances = invidious_instances.copy()
random.shuffle(shuffled_instances)
for i in shuffled_instances:
try:
async with session.get(f'{i}/api/v1/channels/{channel_id}/latest?fields=title,videoId&a={time.time()}', headers={'Cache-Control': 'no-store, max-age=0'}) as resp:
if resp.status != 200:
logging.error('Invidious instance %s returned %s', i, str(resp.status))
continue
return list(map(lambda i: (i['videoId'], i['title']), await resp.json()))
except BaseException:
logging.exception('Invidious instance %s raised exception', i)
async with session.get(f'https://www.youtube.com/feeds/videos.xml?channel_id={channel_id}&a={time.time()}', headers={'Cache-Control': 'no-store, max-age=0'}) as resp:
d = feedparser.parse(await resp.text())
return list(map(lambda i: (i['yt_videoid'], i['title']), d['entries']))
async def main():
await client.start(bot_token=bot_token)
if storage_chat_id and storage_message_id:
try:
m = await client.get_messages(storage_chat_id, ids=storage_message_id)
await m.download_media('ytnotifier.data')
except BaseException:
logging.exception('Exception raised when downloading ytnotifier.data')
try:
with open('ytnotifier.data') as file:
version = file.readline().strip()
if version != '0':
logging.error('ytnotifier.data has incompatible version %s', version)
else:
while True:
video_id = file.readline().strip()
if not video_id:
break
seen_videos.add(video_id)
except BaseException:
logging.exception('Exception raised when parsing ytnotifier.data')
async with ClientSession() as session:
while True:
for i in channels:
logging.info('Checking %s', i)
for video_id, video_title in await get_video_list(session, i):
if video_id not in seen_videos and video_id not in tmp_handled_videos:
tmp_handled_videos.add(video_id)
if loadmode:
seen_videos.add(video_id)
else:
logging.info('Handling %s', video_id)
asyncio.create_task(handle_video(video_id, video_title))
await asyncio.sleep(random.randint(1, 10))
if loadmode:
await save_seen_videos()
break
await asyncio.sleep(wait_seconds)
if __name__ == '__main__':
client.loop.run_until_complete(main())