You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

197 lines
8.8 KiB

import re
import sys
import time
import html
import random
import asyncio
import logging
import yaml
import feedparser
from datetime import datetime, timedelta
from telethon import TelegramClient
from youtube_dl import YoutubeDL
from youtube_dl.extractor import youtube
from aiohttp import ClientSession
logging.basicConfig(level=logging.INFO)
loadmode = False
if len(sys.argv) > 1:
loadmode = sys.argv[1] == 'loadmode'
config = yaml.safe_load(open('config.yaml'))
api_id = config['telegram']['api_id']
api_hash = config['telegram']['api_hash']
bot_token = config['telegram'].get('bot_token')
storage_chat_id = storage_message_id = None
if config.get('storage'):
storage_chat_id = config['storage'].get('storage_chat_id')
storage_message_id = config['storage'].get('storage_message_id')
notify_chat = config['config']['notify_chat']
wait_seconds = config['config']['wait_seconds']
time_offset = config['config'].get('time_offset', '00:00')
time_offset_neg = time_offset.startswith('-')
if time_offset_neg or time_offset.startswith('+'):
time_offset = time_offset[1:]
hours, minutes = time_offset.split(':')
time_offset_td = timedelta(hours=int(hours), minutes=int(minutes))
channels = config['config']['channels']
invidious_instances = config['config'].get('invidious_instances', [])
live_regex = re.compile(r'error: (?:ytnotifier:([0-9]+) )?(this live event will begin|premieres) in (.+)', re.I)
strip_date = re.compile(r' \d{4}-\d{2}-\d{2} \d{2}:\d{2}$')
ytdl = YoutubeDL({'skip_download': True, 'no_color': True})
ytdl.add_default_info_extractors()
client = TelegramClient('ytnotifier', api_id, api_hash)
client.parse_mode = 'html'
seen_videos = set()
seen_videos_lock = asyncio.Lock()
tmp_handled_videos = set()
async def save_seen_videos():
async with seen_videos_lock:
with open('ytnotifier.data', 'w') as file:
file.write('0\n')
for i in seen_videos:
file.write(f'{i}\n')
if storage_chat_id and storage_message_id:
await client.edit_message(storage_chat_id, storage_message_id, file='ytnotifier.data')
youtube._try_get = _try_get = youtube.try_get
def traverse_dict(src):
for (key, value) in src.items():
if key == 'scheduledStartTime':
return value
if isinstance(value, dict):
if value := traverse_dict(value):
return value
return None
def try_get(src, getter, expected_type=None):
if reason := src.get('reason'):
if isinstance(reason, str) and (reason.startswith('This live event will begin in ') or reason.startswith('Premieres in ')):
t = _try_get(src, traverse_dict, str)
if t:
src['reason'] = f'ytnotifier:{t} {reason}'
return _try_get(src, getter, expected_type)
youtube.try_get = try_get
async def _handle_video(video_id, video_title):
last_was_few_moments = False
too_many_attempts_count = 1
video_url = f'https://www.youtube.com/watch?v={video_id}'
notify_text = 'New video'
while True:
try:
video_data = await client.loop.run_in_executor(None, ytdl.extract_info, video_url)
except BaseException as e:
wait_time = 30
message = str(e)
if '429' in message or 'too many' in message.lower():
wait_time = too_many_attempts_count * 60 * 60
too_many_attempts_count += 1
elif match := live_regex.match(message.rstrip('.')):
end_schedule_time = match.group(1) or 0
is_premiere = match.group(2).lower() == 'premieres'
if is_premiere:
notify_text = 'Premiere started'
else:
notify_text = 'Live event started'
human_end_schedule_time = match.group(3)
if end_schedule_time := int(end_schedule_time):
human_end_schedule_time += ' ('
if time_offset_neg:
human_end_schedule_time += str(datetime.fromtimestamp(end_schedule_time) - time_offset_td)
else:
human_end_schedule_time += str(datetime.fromtimestamp(end_schedule_time) + time_offset_td)
human_end_schedule_time += f' {"-" if time_offset_neg else "+"}{time_offset})'
tmp_wait_time = end_schedule_time - time.time()
if tmp_wait_time > wait_time:
wait_time = tmp_wait_time
await client.send_message(notify_chat, f'<b>{"Premiere" if is_premiere else "Live event"} starting in {human_end_schedule_time}:</b> <a href="{video_url}">{html.escape(video_title)}</a>')
elif not last_was_few_moments:
await client.send_message(notify_chat, f'<b>{"Premiere" if is_premiere else "Live event"} starting in {human_end_schedule_time}:</b> <a href="{video_url}">{html.escape(video_title)}</a>')
elif not last_was_few_moments:
await client.send_message(notify_chat, f'<b>{"Premiere" if is_premiere else "Live event"} starting in {human_end_schedule_time}:</b> <a href="{video_url}">{html.escape(video_title)}</a>')
last_was_few_moments = 'moment' in human_end_schedule_time.lower()
await asyncio.sleep(wait_time)
else:
if video_data.get('is_live'):
notify_text = 'Live event started'
break
if tmp_video_title := video_data.get('title'):
video_title = strip_date.sub('', tmp_video_title)
await client.send_message(notify_chat, f'<b>{notify_text}:</b> <a href="{video_url}">{html.escape(video_title)}</a>')
async with seen_videos_lock:
seen_videos.add(video_id)
async def handle_video(video_id, video_title):
try:
while True:
try:
await _handle_video(video_id, video_title)
except BaseException:
logging.exception('Exception raised while notifying %s (%s)', video_id, video_title)
else:
await save_seen_videos()
break
finally:
tmp_handled_videos.discard(video_id)
async def get_video_list(session, channel_id):
shuffled_instances = invidious_instances.copy()
random.shuffle(shuffled_instances)
for i in shuffled_instances:
try:
async with session.get(f'{i}/api/v1/channels/{channel_id}/latest?fields=title,videoId&a={time.time()}', headers={'Cache-Control': 'no-store, max-age=0'}) as resp:
if resp.status != 200:
logging.error('Invidious instance %s returned %s', i, str(resp.status))
continue
return list(map(lambda i: (i['videoId'], i['title']), await resp.json()))
except BaseException:
logging.exception('Invidious instance %s raised exception', i)
async with session.get(f'https://www.youtube.com/feeds/videos.xml?channel_id={channel_id}&a={time.time()}', headers={'Cache-Control': 'no-store, max-age=0'}) as resp:
d = feedparser.parse(await resp.text())
return list(map(lambda i: (i['yt_videoid'], i['title']), d['entries']))
async def main():
await client.start(bot_token=bot_token)
if storage_chat_id and storage_message_id:
try:
m = await client.get_messages(storage_chat_id, ids=storage_message_id)
await m.download_media('ytnotifier.data')
except BaseException:
logging.exception('Exception raised when downloading ytnotifier.data')
try:
with open('ytnotifier.data') as file:
version = file.readline().strip()
if version != '0':
logging.error('ytnotifier.data has incompatible version %s', version)
else:
while True:
video_id = file.readline().strip()
if not video_id:
break
seen_videos.add(video_id)
except BaseException:
logging.exception('Exception raised when parsing ytnotifier.data')
async with ClientSession() as session:
while True:
for i in channels:
logging.info('Checking %s', i)
for video_id, video_title in await get_video_list(session, i):
if video_id not in seen_videos and video_id not in tmp_handled_videos:
tmp_handled_videos.add(video_id)
if loadmode:
seen_videos.add(video_id)
else:
logging.info('Handling %s', video_id)
asyncio.create_task(handle_video(video_id, video_title))
await asyncio.sleep(random.randint(1, 10))
if loadmode:
await save_seen_videos()
break
await asyncio.sleep(wait_seconds)
if __name__ == '__main__':
client.loop.run_until_complete(main())