autoytarchive/autoytarchive/workers.py

299 lines
14 KiB
Python
Raw Permalink Normal View History

2021-04-07 07:21:34 +00:00
import re
2021-01-13 14:14:13 +00:00
import os
2021-10-06 06:51:07 +00:00
import sys
2021-01-13 14:14:13 +00:00
import json
import time
import random
2021-01-13 14:14:13 +00:00
import asyncio
import logging
import tempfile
import traceback
from io import BytesIO
from decimal import Decimal
2021-04-17 15:02:59 +00:00
from urllib.parse import urlparse
2021-10-06 06:51:07 +00:00
try:
from yt_dlp import YoutubeDL
except Exception as e:
print(f'Failed to import yt_dlp due to {type(e).__name__}: {e}, importing youtube_dl', file=sys.stderr)
from youtube_dl import YoutubeDL
2021-01-13 14:14:13 +00:00
from . import session, config, client, seen_videos
2021-04-17 15:02:59 +00:00
from .utils import update_seen_videos, get_video_list
2021-04-07 08:43:42 +00:00
from .cappedio import bopen
2021-01-13 14:14:13 +00:00
tmp_handled = []
2021-04-07 08:43:42 +00:00
size_limit = 2000 * 1024 * 1024
2021-04-07 07:21:34 +00:00
live_regex = re.compile(r'error: (?:autoytarchive:([0-9]+) )?(?:this live event will begin|premieres) in .+', re.I)
strip_date = re.compile(r' \d{4}-\d{2}-\d{2} \d{2}:\d{2}$')
ytdl = YoutubeDL({'skip_download': True, 'no_color': True})
ytdl.add_default_info_extractors()
2021-01-13 14:14:13 +00:00
async def check_channels(nodl):
if nodl:
await _check_channels(True)
return
while True:
try:
await _check_channels(False)
except BaseException:
logging.exception('Exception encountered with check channels')
try:
with BytesIO(traceback.format_exc().encode()) as file:
file.name = 'check-channels-error.txt'
file.seek(0)
await client.send_message(config['config']['storage_chat_id'], 'Exception encountered with check channels', file=file)
except BaseException:
logging.exception('Exception encountered when sending message to Telegram about check channels exception')
await asyncio.sleep(config['config']['wait_seconds'])
check_channels_lock = asyncio.Lock()
async def _check_channels(nodl):
async with check_channels_lock:
for i in config['config']['channels']:
logging.info('Checking channel %s', i)
2021-04-17 15:02:59 +00:00
for video_id in await get_video_list(session, i):
if video_id in seen_videos + tmp_handled:
2021-01-13 14:14:13 +00:00
continue
if nodl:
2021-04-17 15:02:59 +00:00
seen_videos.append(video_id)
2021-01-13 14:14:13 +00:00
continue
2021-04-17 15:02:59 +00:00
asyncio.create_task(check_video(video_id))
tmp_handled.append(video_id)
await asyncio.sleep(random.randint(1, 10))
2021-01-13 14:14:13 +00:00
2021-04-17 15:02:59 +00:00
async def check_video(video_id):
2021-01-13 14:14:13 +00:00
for _ in range(5):
try:
2021-04-17 15:02:59 +00:00
return await _check_video(video_id)
2021-01-13 14:14:13 +00:00
except BaseException:
2021-04-17 15:02:59 +00:00
logging.exception('Exception encountered with checking video %s', video_id)
2021-01-13 14:14:13 +00:00
try:
with BytesIO(traceback.format_exc().encode()) as file:
2021-04-17 15:02:59 +00:00
file.name = f'check-videos-error-{video_id}.txt'
2021-01-13 14:14:13 +00:00
file.seek(0)
2021-04-17 15:02:59 +00:00
await client.send_message(config['config']['storage_chat_id'], f'Exception encountered with checking video {video_id}', file=file, parse_mode=None)
2021-01-13 14:14:13 +00:00
except BaseException:
2021-04-17 15:02:59 +00:00
logging.exception('Exception encountered when sending message to Telegram about checking video %s exception', video_id)
2021-01-13 14:14:13 +00:00
2021-04-17 15:02:59 +00:00
async def _check_video(video_id):
logging.info('Checking video %s', video_id)
2021-01-13 14:14:13 +00:00
first_try_live = waited = False
2021-04-07 07:21:34 +00:00
too_many_requests_count = 1
2021-01-13 14:14:13 +00:00
while True:
2021-04-07 07:21:34 +00:00
try:
2021-04-17 15:02:59 +00:00
video_json = await client.loop.run_in_executor(None, ytdl.extract_info, f'https://youtube.com/watch?v={video_id}')
2021-04-07 07:21:34 +00:00
except BaseException as e:
2021-06-06 23:02:13 +00:00
wait_time = random.randint(30, 10 * 30)
2021-04-07 07:21:34 +00:00
message = str(e)
if '429' in message or 'too many' in message.lower():
wait_time = too_many_requests_count * 60 * 60
too_many_requests_count += 1
elif match := live_regex.match(message.rstrip('.')):
end_schedule_time = match.group(1) or 0
if end_schedule_time := int(end_schedule_time):
tmp_wait_time = end_schedule_time - time.time()
if tmp_wait_time > wait_time:
wait_time = tmp_wait_time
await asyncio.sleep(wait_time)
waited = True
else:
if not waited:
2021-01-13 14:14:13 +00:00
first_try_live = True
break
2021-01-20 08:26:56 +00:00
if not video_json.get('is_live'):
first_try_live = False
2021-01-13 14:14:13 +00:00
video_queue.put_nowait((video_json, time.time(), first_try_live))
video_queue = asyncio.Queue()
async def video_worker():
while True:
try:
await _video_worker()
except BaseException:
logging.exception('Exception encountered with video worker')
try:
with BytesIO(traceback.format_exc().encode()) as file:
file.name = 'video-worker-error.txt'
file.seek(0)
await client.send_message(config['config']['storage_chat_id'], 'Exception encountered with video worker', file=file)
except BaseException:
logging.exception('Exception encountered when sending message to Telegram about video worker exception')
async def _video_worker():
while True:
video_json, start_time, first_try_live = await video_queue.get()
2021-01-20 08:26:56 +00:00
late_to_queue = (Decimal(time.time()) - Decimal(start_time)) > 5
is_late = first_try_live or late_to_queue
2021-01-13 14:14:13 +00:00
tempdir_obj = tempfile.TemporaryDirectory(dir='.')
try:
tempdir = tempdir_obj.name
2021-01-20 08:26:56 +00:00
if late_to_queue:
2021-01-21 17:09:52 +00:00
for i in range(5):
2021-06-06 23:02:13 +00:00
wait_time = random.randint(30, 10 * 60)
2021-04-07 07:21:34 +00:00
try:
tmp = await client.loop.run_in_executor(None, ytdl.extract_info, video_json['id'])
except BaseException as e:
e = str(e)
if '429' in e or 'too many request' in e.lower():
2021-01-21 17:09:52 +00:00
wait_time = (i + 1) * 60 * 60
2021-01-20 08:26:56 +00:00
else:
2021-04-07 07:21:34 +00:00
video_json = tmp
break
2021-01-21 17:09:52 +00:00
await asyncio.sleep(wait_time)
async def construct_command():
nonlocal video_json
command = ['ffmpeg', '-y']
is_manifest = False
if video_json.get('requested_formats'):
for i in video_json['requested_formats']:
if urlparse(i['url']).netloc == 'manifest.googlevideo.com':
is_manifest = True
command.extend(('-i', i['url']))
else:
is_manifest = urlparse(video_json['url']).netloc == 'manifest.googlevideo.com'
command.extend(('-i', video_json['url']))
if is_manifest:
await asyncio.sleep(video_json['duration'] + 30)
2021-06-06 23:02:13 +00:00
wait_time = random.randint(30, 10 * 30)
for i in range(5):
try:
tmp = await client.loop.run_in_executor(None, ytdl.extract_info, video_json['id'])
except BaseException as e:
e = str(e)
if '429' in e or 'too many request' in e.lower():
wait_time = (i + 1) * 60 * 60
else:
video_json = tmp
break
await asyncio.sleep(wait_time)
return await construct_command()
command.extend(('-c', 'copy', video_filename))
return command
2021-01-13 14:14:13 +00:00
video_filename = os.path.join(tempdir, video_json['id'] + '.mkv')
2021-04-15 03:25:47 +00:00
with open(video_json['id'] + '.log', 'ab') as log_file:
async def auto_kill(proc):
last_tell = log_file.tell()
while True:
await asyncio.sleep(300)
current_tell = log_file.tell()
if current_tell == last_tell:
break
last_tell = current_tell
proc.stdin.write(b'q')
await proc.stdin.drain()
proc = await asyncio.create_subprocess_exec(*(await construct_command()), stdin=asyncio.subprocess.PIPE, stdout=log_file, stderr=log_file)
text = 'New video'
if is_late:
text += ' (is late)'
video_json['title'] = strip_date.sub('', video_json['title']).strip()
text += f': {video_json["title"]}\nhttps://youtube.com/watch?v={video_json["id"]}'
with BytesIO(json.dumps(video_json, indent=4).encode()) as file:
file.name = video_json['id'] + '.json'
file.seek(0)
await client.send_file(config['config']['storage_chat_id'], file, caption=text, parse_mode=None)
if video_json.get('thumbnail'):
thumbnail_ext = os.path.splitext(urlparse(video_json['thumbnail']).path)[1]
thumbnail_filename = os.path.join(tempdir, video_json['id'] + thumbnail_ext)
async with session.get(video_json['thumbnail']) as resp:
with open(thumbnail_filename, 'wb') as file:
while True:
chunk = await resp.content.read(4096)
if not chunk:
break
file.write(chunk)
await client.send_file(config['config']['storage_chat_id'], thumbnail_filename, caption=text, parse_mode=None)
os.remove(thumbnail_filename)
for _ in range(50):
auto_kill_task = asyncio.create_task(auto_kill(proc))
await proc.communicate()
auto_kill_task.cancel()
2021-04-07 07:21:34 +00:00
try:
await auto_kill_task
except asyncio.CancelledError:
pass
if not proc.returncode:
2021-04-07 07:21:34 +00:00
break
2021-06-06 23:02:13 +00:00
wait_time = random.randint(30, 10 * 60)
if video_json.get('duration'):
is_manifest = False
if video_json.get('url'):
is_manifest = urlparse(video_json['url']).netloc == 'manifest.googlevideo.com'
if not is_manifest and video_json.get('requested_formats'):
for i in video_json['requested_formats']:
if urlparse(i['url']).netloc == 'manifest.googlevideo.com':
is_manifest = True
break
if is_manifest:
await asyncio.sleep(video_json['duration'] + 30)
try:
await client.send_message(config['config']['storage_chat_id'], f'Failed to download video {video_json["id"]}, please check logs', parse_mode=None)
except BaseException:
logging.exception('Exception encountered when sending message to Telegram about download failure exception')
for i in range(5):
try:
tmp = await client.loop.run_in_executor(None, ytdl.extract_info, video_json['id'])
except BaseException as e:
e = str(e)
if '429' in e or 'too many request' in e.lower():
wait_time = (i + 1) * 60 * 60
else:
video_json = tmp
break
await asyncio.sleep(wait_time)
proc = await asyncio.create_subprocess_exec(*(await construct_command()), stdin=asyncio.subprocess.PIPE, stdout=log_file, stderr=log_file)
2021-01-13 14:14:13 +00:00
except BaseException:
tempdir_obj.cleanup()
raise Exception(f'This is a hack to retrieve the video id ({video_json["id"]}), the actual exception should be above')
2021-01-13 14:14:13 +00:00
upload_queue.put_nowait((tempdir_obj, video_json))
video_queue.task_done()
upload_queue = asyncio.Queue()
async def upload_worker():
while True:
try:
await _upload_worker()
except BaseException:
logging.exception('Exception encountered with upload worker')
try:
with BytesIO(traceback.format_exc().encode()) as file:
file.name = 'upload-worker-error.txt'
file.seek(0)
await client.send_message(config['config']['storage_chat_id'], 'Exception encountered with upload worker', file=file)
except BaseException:
logging.exception('Exception encountered when sending message to Telegram about upload worker exception')
async def _upload_worker():
while True:
tempdir_obj, video_json = await upload_queue.get()
try:
tempdir = tempdir_obj.name
2021-04-07 08:43:42 +00:00
base_filename = video_json['id'] + '.mkv'
video_filename = os.path.join(tempdir, base_filename)
files_sent = size_sent = 0
messages = []
2021-04-09 03:45:14 +00:00
file = bopen(video_filename, None)
2021-04-09 14:34:48 +00:00
file.seek(0, os.SEEK_END)
total_size = file.tell()
is_big = total_size > size_limit
2021-04-07 08:43:42 +00:00
while total_size > 0:
2021-04-07 17:43:00 +00:00
file.seek(size_sent)
2021-04-09 03:45:14 +00:00
file.capped_size = size_limit
2021-04-07 08:43:42 +00:00
if is_big:
file.name = f'{base_filename}.part{str(files_sent).rjust(2, "0")}'
2021-04-09 03:45:14 +00:00
else:
file.name = base_filename
2021-04-07 08:43:42 +00:00
messages.append((await client.send_message(config['config']['storage_chat_id'], f'Uploading {file.name}...', parse_mode=None)).id)
2021-04-09 14:34:48 +00:00
message = await client.send_file(config['config']['storage_chat_id'], file, caption=file.name, parse_mode=None, file_size=size_limit if total_size > size_limit else total_size)
total_size -= message.document.size
2021-04-07 08:43:42 +00:00
if total_size > 0:
size_sent += message.document.size
2021-04-07 08:43:42 +00:00
files_sent += 1
2021-04-09 14:34:48 +00:00
file.capped_size = None
file.close()
2021-04-07 08:43:42 +00:00
if messages:
await client.delete_messages(config['config']['storage_chat_id'], messages)
2021-01-13 14:14:13 +00:00
finally:
tempdir_obj.cleanup()
seen_videos.append(video_json['id'])
await update_seen_videos()
upload_queue.task_done()