autoytarchive/autoytarchive/utils.py

46 lines
2.0 KiB
Python
Raw Normal View History

2021-04-17 15:02:59 +00:00
import time
2021-01-13 14:14:13 +00:00
import json
2021-04-17 15:02:59 +00:00
import logging
import feedparser
2021-01-13 14:14:13 +00:00
from io import BytesIO
2021-04-07 07:21:34 +00:00
from youtube_dl.extractor import youtube
2021-01-13 14:14:13 +00:00
from . import config, client, seen_videos
2021-04-07 07:21:34 +00:00
youtube._try_get = _try_get = youtube.try_get
def traverse_dict(src):
for (key, value) in src.items():
if key == 'scheduledStartTime':
return value
if isinstance(value, dict):
if value := traverse_dict(value):
return value
return None
def try_get(src, getter, expected_type=None):
if reason := src.get('reason'):
if isinstance(reason, str) and (reason.startswith('This live event will begin in ') or reason.startswith('Premieres in ')):
if t := _try_get(src, traverse_dict, str):
src['reason'] = f'autoytarchive:{t} {reason}'
return _try_get(src, getter, expected_type)
youtube.try_get = try_get
2021-01-13 14:14:13 +00:00
async def update_seen_videos():
with BytesIO(json.dumps(seen_videos).encode()) as file:
file.name = 'autoytarchive.json'
file.seek(0)
await client.edit_message(config['config']['storage_chat_id'], config['config']['storage_message_id'], file=file)
2021-04-17 15:02:59 +00:00
async def get_video_list(session, channel_id):
for i in config['config'].get('invidious_instances', []):
try:
async with session.get(f'{i}/api/v1/channels/{channel_id}/latest?fields=videoId&a={time.time()}', headers={'Cache-Control': 'no-store, max-age=0'}) as resp:
if resp.status != 200:
logging.error('Invidious instance %s returned %s', i, str(resp.status))
continue
return list(map(lambda i: i['videoId'], await resp.json()))
except BaseException:
logging.exception('Invidious instance %s raised exception', i)
async with session.get(f'https://www.youtube.com/feeds/videos.xml?channel_id={channel_id}&a={time.time()}', headers={'Cache-Control': 'no-store, max-age=0'}) as resp:
d = feedparser.parse(await resp.text())
return list(map(lambda i: i['yt_videoid'], d['entries']))