autoytarchive/autoytarchive/workers.py

297 lines
14 KiB
Python

import re
import os
import json
import time
import random
import asyncio
import logging
import tempfile
import traceback
import feedparser
from io import BytesIO
from decimal import Decimal
from urllib.parse import quote as urlencode, urlparse
from youtube_dl import YoutubeDL
from . import session, config, client, seen_videos
from .utils import update_seen_videos
from .cappedio import bopen
tmp_handled = []
size_limit = 2000 * 1024 * 1024
live_regex = re.compile(r'error: (?:autoytarchive:([0-9]+) )?(?:this live event will begin|premieres) in .+', re.I)
strip_date = re.compile(r' \d{4}-\d{2}-\d{2} \d{2}:\d{2}$')
ytdl = YoutubeDL({'skip_download': True, 'no_color': True})
ytdl.add_default_info_extractors()
async def check_channels(nodl):
if nodl:
await _check_channels(True)
return
while True:
try:
await _check_channels(False)
except BaseException:
logging.exception('Exception encountered with check channels')
try:
with BytesIO(traceback.format_exc().encode()) as file:
file.name = 'check-channels-error.txt'
file.seek(0)
await client.send_message(config['config']['storage_chat_id'], 'Exception encountered with check channels', file=file)
except BaseException:
logging.exception('Exception encountered when sending message to Telegram about check channels exception')
await asyncio.sleep(config['config']['wait_seconds'])
check_channels_lock = asyncio.Lock()
async def _check_channels(nodl):
async with check_channels_lock:
for i in config['config']['channels']:
logging.info('Checking channel %s', i)
async with session.get(f'https://youtube.com/feeds/videos.xml?channel_id={urlencode(i)}&a={time.time()}') as resp:
data = feedparser.parse(await resp.text())
for j in data['items']:
if j['yt_videoid'] in seen_videos + tmp_handled:
continue
if nodl:
seen_videos.append(j['yt_videoid'])
continue
asyncio.create_task(check_video(j))
tmp_handled.append(j['yt_videoid'])
await asyncio.sleep(random.randint(1, 10))
async def check_video(video):
for _ in range(5):
try:
return await _check_video(video)
except BaseException:
logging.exception('Exception encountered with checking video %s', video.get('yt_videoid'))
try:
with BytesIO(traceback.format_exc().encode()) as file:
file.name = f'check-videos-error-{video.get("yt_videoid")}.txt'
file.seek(0)
await client.send_message(config['config']['storage_chat_id'], f'Exception encountered with checking video {video.get("yt_videoid")}', file=file, parse_mode=None)
except BaseException:
logging.exception('Exception encountered when sending message to Telegram about checking video %s exception', video.get('yt_videoid'))
async def _check_video(video):
logging.info('Checking video %s', video['yt_videoid'])
first_try_live = waited = False
too_many_requests_count = 1
while True:
try:
video_json = await client.loop.run_in_executor(None, ytdl.extract_info, video['link'])
except BaseException as e:
wait_time = 30
message = str(e)
if '429' in message or 'too many' in message.lower():
wait_time = too_many_requests_count * 60 * 60
too_many_requests_count += 1
elif match := live_regex.match(message.rstrip('.')):
end_schedule_time = match.group(1) or 0
if end_schedule_time := int(end_schedule_time):
tmp_wait_time = end_schedule_time - time.time()
if tmp_wait_time > wait_time:
wait_time = tmp_wait_time
await asyncio.sleep(wait_time)
waited = True
else:
if not waited:
first_try_live = True
break
if not video_json.get('is_live'):
first_try_live = False
video_queue.put_nowait((video_json, time.time(), first_try_live))
video_queue = asyncio.Queue()
async def video_worker():
while True:
try:
await _video_worker()
except BaseException:
logging.exception('Exception encountered with video worker')
try:
with BytesIO(traceback.format_exc().encode()) as file:
file.name = 'video-worker-error.txt'
file.seek(0)
await client.send_message(config['config']['storage_chat_id'], 'Exception encountered with video worker', file=file)
except BaseException:
logging.exception('Exception encountered when sending message to Telegram about video worker exception')
async def _video_worker():
while True:
video_json, start_time, first_try_live = await video_queue.get()
late_to_queue = (Decimal(time.time()) - Decimal(start_time)) > 5
is_late = first_try_live or late_to_queue
tempdir_obj = tempfile.TemporaryDirectory(dir='.')
try:
tempdir = tempdir_obj.name
if late_to_queue:
for i in range(5):
wait_time = 30
try:
tmp = await client.loop.run_in_executor(None, ytdl.extract_info, video_json['id'])
except BaseException as e:
e = str(e)
if '429' in e or 'too many request' in e.lower():
wait_time = (i + 1) * 60 * 60
else:
video_json = tmp
break
await asyncio.sleep(wait_time)
async def construct_command():
nonlocal video_json
command = ['ffmpeg', '-y']
is_manifest = False
if video_json.get('requested_formats'):
for i in video_json['requested_formats']:
if urlparse(i['url']).netloc == 'manifest.googlevideo.com':
is_manifest = True
command.extend(('-i', i['url']))
else:
is_manifest = urlparse(video_json['url']).netloc == 'manifest.googlevideo.com'
command.extend(('-i', video_json['url']))
if is_manifest:
await asyncio.sleep(video_json['duration'] + 30)
wait_time = 30
for i in range(5):
try:
tmp = await client.loop.run_in_executor(None, ytdl.extract_info, video_json['id'])
except BaseException as e:
e = str(e)
if '429' in e or 'too many request' in e.lower():
wait_time = (i + 1) * 60 * 60
else:
video_json = tmp
break
await asyncio.sleep(wait_time)
return await construct_command()
command.extend(('-c', 'copy', video_filename))
return command
video_filename = os.path.join(tempdir, video_json['id'] + '.mkv')
with open(video_json['id'] + '.log', 'ab') as log_file:
async def auto_kill(proc):
last_tell = log_file.tell()
while True:
await asyncio.sleep(300)
current_tell = log_file.tell()
if current_tell == last_tell:
break
last_tell = current_tell
proc.stdin.write(b'q')
await proc.stdin.drain()
proc = await asyncio.create_subprocess_exec(*(await construct_command()), stdin=asyncio.subprocess.PIPE, stdout=log_file, stderr=log_file)
text = 'New video'
if is_late:
text += ' (is late)'
video_json['title'] = strip_date.sub('', video_json['title']).strip()
text += f': {video_json["title"]}\nhttps://youtube.com/watch?v={video_json["id"]}'
with BytesIO(json.dumps(video_json, indent=4).encode()) as file:
file.name = video_json['id'] + '.json'
file.seek(0)
await client.send_file(config['config']['storage_chat_id'], file, caption=text, parse_mode=None)
if video_json.get('thumbnail'):
thumbnail_ext = os.path.splitext(urlparse(video_json['thumbnail']).path)[1]
thumbnail_filename = os.path.join(tempdir, video_json['id'] + thumbnail_ext)
async with session.get(video_json['thumbnail']) as resp:
with open(thumbnail_filename, 'wb') as file:
while True:
chunk = await resp.content.read(4096)
if not chunk:
break
file.write(chunk)
await client.send_file(config['config']['storage_chat_id'], thumbnail_filename, caption=text, parse_mode=None)
os.remove(thumbnail_filename)
for _ in range(50):
auto_kill_task = asyncio.create_task(auto_kill(proc))
await proc.communicate()
auto_kill_task.cancel()
try:
await auto_kill_task
except asyncio.CancelledError:
pass
if not proc.returncode:
break
wait_time = 30
if video_json.get('duration'):
is_manifest = False
if video_json.get('url'):
is_manifest = urlparse(video_json['url']).netloc == 'manifest.googlevideo.com'
if not is_manifest and video_json.get('requested_formats'):
for i in video_json['requested_formats']:
if urlparse(i['url']).netloc == 'manifest.googlevideo.com':
is_manifest = True
break
if is_manifest:
await asyncio.sleep(video_json['duration'] + 30)
try:
await client.send_message(config['config']['storage_chat_id'], f'Failed to download video {video_json["id"]}, please check logs', parse_mode=None)
except BaseException:
logging.exception('Exception encountered when sending message to Telegram about download failure exception')
for i in range(5):
try:
tmp = await client.loop.run_in_executor(None, ytdl.extract_info, video_json['id'])
except BaseException as e:
e = str(e)
if '429' in e or 'too many request' in e.lower():
wait_time = (i + 1) * 60 * 60
else:
video_json = tmp
break
await asyncio.sleep(wait_time)
proc = await asyncio.create_subprocess_exec(*(await construct_command()), stdin=asyncio.subprocess.PIPE, stdout=log_file, stderr=log_file)
except BaseException:
tempdir_obj.cleanup()
raise
upload_queue.put_nowait((tempdir_obj, video_json))
video_queue.task_done()
upload_queue = asyncio.Queue()
async def upload_worker():
while True:
try:
await _upload_worker()
except BaseException:
logging.exception('Exception encountered with upload worker')
try:
with BytesIO(traceback.format_exc().encode()) as file:
file.name = 'upload-worker-error.txt'
file.seek(0)
await client.send_message(config['config']['storage_chat_id'], 'Exception encountered with upload worker', file=file)
except BaseException:
logging.exception('Exception encountered when sending message to Telegram about upload worker exception')
async def _upload_worker():
while True:
tempdir_obj, video_json = await upload_queue.get()
try:
tempdir = tempdir_obj.name
base_filename = video_json['id'] + '.mkv'
video_filename = os.path.join(tempdir, base_filename)
files_sent = size_sent = 0
messages = []
file = bopen(video_filename, None)
file.seek(0, os.SEEK_END)
total_size = file.tell()
is_big = total_size > size_limit
while total_size > 0:
file.seek(size_sent)
file.capped_size = size_limit
if is_big:
file.name = f'{base_filename}.part{str(files_sent).rjust(2, "0")}'
else:
file.name = base_filename
messages.append((await client.send_message(config['config']['storage_chat_id'], f'Uploading {file.name}...', parse_mode=None)).id)
message = await client.send_file(config['config']['storage_chat_id'], file, caption=file.name, parse_mode=None, file_size=size_limit if total_size > size_limit else total_size)
total_size -= message.document.size
if total_size > 0:
size_sent += message.document.size
files_sent += 1
file.capped_size = None
file.close()
if messages:
await client.delete_messages(config['config']['storage_chat_id'], messages)
finally:
tempdir_obj.cleanup()
seen_videos.append(video_json['id'])
await update_seen_videos()
upload_queue.task_done()