From 771d4fb29f2e67f45ba91f29c059b5b941b5f715 Mon Sep 17 00:00:00 2001 From: blank X Date: Wed, 20 Jan 2021 15:26:56 +0700 Subject: [PATCH] Try to fix some bugs welcome more bugs --- autoytarchive/utils.py | 9 ++++++ autoytarchive/workers.py | 65 +++++++++++++++++++++++++++++++--------- 2 files changed, 60 insertions(+), 14 deletions(-) diff --git a/autoytarchive/utils.py b/autoytarchive/utils.py index 23ea98c..df95d6d 100644 --- a/autoytarchive/utils.py +++ b/autoytarchive/utils.py @@ -1,4 +1,5 @@ import os +import sys import json import shlex import asyncio @@ -24,3 +25,11 @@ async def update_seen_videos(): file.name = 'autoytarchive.json' file.seek(0) await client.edit_message(config['config']['storage_chat_id'], config['config']['storage_message_id'], file=file) + +async def get_video(video_id): + proc = await asyncio.create_subprocess_exec(sys.executable, 'youtube-dl-injected.py', '--dump-single-json', '--', video_id, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) + stdout, stderr = await proc.communicate() + if proc.returncode: + error_message = next(i.strip() for i in stderr.decode().split('\n') if i.startswith('ERROR: ')) + return error_message or (proc.returncode, (stderr + stdout).decode()) + return json.loads(stdout) diff --git a/autoytarchive/workers.py b/autoytarchive/workers.py index 3b0063a..cdf417f 100644 --- a/autoytarchive/workers.py +++ b/autoytarchive/workers.py @@ -1,5 +1,4 @@ import os -import sys import json import time import asyncio @@ -11,7 +10,7 @@ from io import BytesIO from decimal import Decimal from urllib.parse import quote as urlencode, urlparse from . import session, config, client, seen_videos -from .utils import split_files, update_seen_videos +from .utils import split_files, update_seen_videos, get_video tmp_handled = [] @@ -67,16 +66,14 @@ async def _check_video(video): logging.info('Checking video %s', video['yt_videoid']) first_try_live = waited = False while True: - proc = await asyncio.create_subprocess_exec(sys.executable, 'youtube-dl-injected.py', '--dump-single-json', video['link'], stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) - stdout, stderr = await proc.communicate() - if not proc.returncode: + video_json = await get_video(video['link']) + if isinstance(video_json, dict): if not waited: first_try_live = True break wait_time = 30 - error_message = next(i for i in stderr.decode().split('\n') if i.startswith('ERROR: ')) - if error_message: - error_message = error_message[7:] + if isinstance(video_json, str): + error_message = video_json[7:] if error_message.startswith('AUTOYTARCHIVE:'): tmp = error_message.split(':', 1)[1].split(' ', 1)[0] if tmp.isnumeric(): @@ -85,11 +82,10 @@ async def _check_video(video): wait_time = new_wait_time logging.error('Error on video %s: %s', video['yt_videoid'], error_message) else: - logging.error('Video %s returned status code %s\n%s', video['yt_videoid'], proc.returncode, (stderr + stdout).decode()) + logging.error('Video %s returned status code %s\n%s', video['yt_videoid'], *video_json) await asyncio.sleep(wait_time) waited = True - video_json = json.loads(stdout) - if not video_json.get('is_live') and first_try_live: + if not video_json.get('is_live'): first_try_live = False video_queue.put_nowait((video_json, time.time(), first_try_live)) @@ -111,11 +107,23 @@ async def video_worker(): async def _video_worker(): while True: video_json, start_time, first_try_live = await video_queue.get() - is_late = first_try_live or (Decimal(time.time()) - Decimal(start_time)) > 5 - command = ['ffmpeg'] + late_to_queue = (Decimal(time.time()) - Decimal(start_time)) > 5 + is_late = first_try_live or late_to_queue + command = ['ffmpeg', '-y'] tempdir_obj = tempfile.TemporaryDirectory(dir='.') try: tempdir = tempdir_obj.name + if late_to_queue: + for _ in range(5): + tmp = await get_video(video_json['id']) + if isinstance(tmp, dict): + video_json = tmp + break + if isinstance(tmp, str): + logging.error('Error on video %s: %s', video_json['id'], tmp) + else: + logging.error('Video %s returned status code %s\n%s', video_json['id'], *video_json) + await asyncio.sleep(30) if video_json.get('requested_formats'): for i in video_json['requested_formats']: command.extend(('-i', i['url'])) @@ -144,7 +152,36 @@ async def _video_worker(): file.write(chunk) await client.send_file(config['config']['storage_chat_id'], thumbnail_filename, caption=text, parse_mode=None) os.remove(thumbnail_filename) - await proc.communicate() + for _ in range(50): + await proc.communicate() + if not proc.returncode: + break + wait_time = 30 + if video_json.get('duration'): + is_manifest = False + if video_json.get('url'): + is_manifest = urlparse(video_json['url']).netloc == 'manifest.googlevideo.com' + if not is_manifest and video_json.get('requested_formats'): + for i in video_json['requested_formats']: + if urlparse(i['url']).netloc == 'manifest.googlevideo.com': + is_manifest = True + break + if is_manifest: + wait_time += video_json['duration'] + try: + await client.send_message(config['config']['storage_chat_id'], f'Failed to download video {video_json["id"]}, please check logs', parse_mode=None) + except BaseException: + logging.exception('Exception encountered when sending message to Telegram about download failure exception') + for _ in range(5): + tmp = await get_video(video_json['id']) + if isinstance(tmp, dict): + video_json = tmp + break + if isinstance(tmp, str): + logging.error('Error on video %s: %s', video_json['id'], tmp) + else: + logging.error('Video %s returned status code %s\n%s', video_json['id'], *tmp) + await asyncio.sleep(wait_time) except BaseException: tempdir_obj.cleanup() raise