autoytarchive/autoytarchive/utils.py

49 lines
2.1 KiB
Python

import time
import json
import random
import logging
import feedparser
from io import BytesIO
from youtube_dl.extractor import youtube
from . import config, client, seen_videos
youtube._try_get = _try_get = youtube.try_get
def traverse_dict(src):
for (key, value) in src.items():
if key == 'scheduledStartTime':
return value
if isinstance(value, dict):
if value := traverse_dict(value):
return value
return None
def try_get(src, getter, expected_type=None):
if reason := src.get('reason'):
if isinstance(reason, str) and (reason.startswith('This live event will begin in ') or reason.startswith('Premieres in ')):
if t := _try_get(src, traverse_dict, str):
src['reason'] = f'autoytarchive:{t} {reason}'
return _try_get(src, getter, expected_type)
youtube.try_get = try_get
async def update_seen_videos():
with BytesIO(json.dumps(seen_videos).encode()) as file:
file.name = 'autoytarchive.json'
file.seek(0)
await client.edit_message(config['config']['storage_chat_id'], config['config']['storage_message_id'], file=file)
async def get_video_list(session, channel_id):
invidious_instances = config['config'].get('invidious_instances', [])
random.shuffle(invidious_instances)
for i in invidious_instances:
try:
async with session.get(f'{i}/api/v1/channels/{channel_id}/latest?fields=videoId&a={time.time()}', headers={'Cache-Control': 'no-store, max-age=0'}) as resp:
if resp.status != 200:
logging.error('Invidious instance %s returned %s', i, str(resp.status))
continue
return list(map(lambda i: i['videoId'], await resp.json()))[:15]
except BaseException:
logging.exception('Invidious instance %s raised exception', i)
async with session.get(f'https://www.youtube.com/feeds/videos.xml?channel_id={channel_id}&a={time.time()}', headers={'Cache-Control': 'no-store, max-age=0'}) as resp:
d = feedparser.parse(await resp.text())
return list(map(lambda i: i['yt_videoid'], d['entries']))[:15]