diff --git a/autoytarchive/workers.py b/autoytarchive/workers.py index a3be17c..16c4b0a 100644 --- a/autoytarchive/workers.py +++ b/autoytarchive/workers.py @@ -165,60 +165,77 @@ async def _video_worker(): command.extend(('-c', 'copy', video_filename)) return command video_filename = os.path.join(tempdir, video_json['id'] + '.mkv') - proc = await asyncio.create_subprocess_exec(*(await construct_command())) - text = 'New video' - if is_late: - text += ' (is late)' - video_json['title'] = strip_date.sub('', video_json['title']).strip() - text += f': {video_json["title"]}\nhttps://youtube.com/watch?v={video_json["id"]}' - with BytesIO(json.dumps(video_json, indent=4).encode()) as file: - file.name = video_json['id'] + '.json' - file.seek(0) - await client.send_file(config['config']['storage_chat_id'], file, caption=text, parse_mode=None) - if video_json.get('thumbnail'): - thumbnail_ext = os.path.splitext(urlparse(video_json['thumbnail']).path)[1] - thumbnail_filename = os.path.join(tempdir, video_json['id'] + thumbnail_ext) - async with session.get(video_json['thumbnail']) as resp: - with open(thumbnail_filename, 'wb') as file: - while True: - chunk = await resp.content.read(4096) - if not chunk: - break - file.write(chunk) - await client.send_file(config['config']['storage_chat_id'], thumbnail_filename, caption=text, parse_mode=None) - os.remove(thumbnail_filename) - for _ in range(50): - await proc.communicate() - if not proc.returncode: - break - wait_time = 30 - if video_json.get('duration'): - is_manifest = False - if video_json.get('url'): - is_manifest = urlparse(video_json['url']).netloc == 'manifest.googlevideo.com' - if not is_manifest and video_json.get('requested_formats'): - for i in video_json['requested_formats']: - if urlparse(i['url']).netloc == 'manifest.googlevideo.com': - is_manifest = True - break - if is_manifest: - await asyncio.sleep(video_json['duration'] + 30) - try: - await client.send_message(config['config']['storage_chat_id'], f'Failed to download video {video_json["id"]}, please check logs', parse_mode=None) - except BaseException: - logging.exception('Exception encountered when sending message to Telegram about download failure exception') - for i in range(5): + with open(video_json['id'] + '.mkv', 'ab') as log_file: + async def auto_kill(proc): + last_tell = log_file.tell() + while True: + await asyncio.sleep(300) + current_tell = log_file.tell() + if current_tell == last_tell: + break + last_tell = current_tell + proc.stdin.write(b'q') + await proc.stdin.drain() + proc = await asyncio.create_subprocess_exec(*(await construct_command()), stdin=asyncio.subprocess.PIPE, stdout=log_file, stderr=log_file) + text = 'New video' + if is_late: + text += ' (is late)' + video_json['title'] = strip_date.sub('', video_json['title']).strip() + text += f': {video_json["title"]}\nhttps://youtube.com/watch?v={video_json["id"]}' + with BytesIO(json.dumps(video_json, indent=4).encode()) as file: + file.name = video_json['id'] + '.json' + file.seek(0) + await client.send_file(config['config']['storage_chat_id'], file, caption=text, parse_mode=None) + if video_json.get('thumbnail'): + thumbnail_ext = os.path.splitext(urlparse(video_json['thumbnail']).path)[1] + thumbnail_filename = os.path.join(tempdir, video_json['id'] + thumbnail_ext) + async with session.get(video_json['thumbnail']) as resp: + with open(thumbnail_filename, 'wb') as file: + while True: + chunk = await resp.content.read(4096) + if not chunk: + break + file.write(chunk) + await client.send_file(config['config']['storage_chat_id'], thumbnail_filename, caption=text, parse_mode=None) + os.remove(thumbnail_filename) + for _ in range(50): + auto_kill_task = asyncio.create_task(auto_kill(proc)) + await proc.communicate() + auto_kill_task.cancel() try: - tmp = await client.loop.run_in_executor(None, ytdl.extract_info, video_json['id']) - except BaseException as e: - e = str(e) - if '429' in e or 'too many request' in e.lower(): - wait_time = (i + 1) * 60 * 60 - else: - video_json = tmp + await auto_kill_task + except asyncio.CancelledError: + pass + if not proc.returncode: break - await asyncio.sleep(wait_time) - proc = await asyncio.create_subprocess_exec(*(await construct_command())) + wait_time = 30 + if video_json.get('duration'): + is_manifest = False + if video_json.get('url'): + is_manifest = urlparse(video_json['url']).netloc == 'manifest.googlevideo.com' + if not is_manifest and video_json.get('requested_formats'): + for i in video_json['requested_formats']: + if urlparse(i['url']).netloc == 'manifest.googlevideo.com': + is_manifest = True + break + if is_manifest: + await asyncio.sleep(video_json['duration'] + 30) + try: + await client.send_message(config['config']['storage_chat_id'], f'Failed to download video {video_json["id"]}, please check logs', parse_mode=None) + except BaseException: + logging.exception('Exception encountered when sending message to Telegram about download failure exception') + for i in range(5): + try: + tmp = await client.loop.run_in_executor(None, ytdl.extract_info, video_json['id']) + except BaseException as e: + e = str(e) + if '429' in e or 'too many request' in e.lower(): + wait_time = (i + 1) * 60 * 60 + else: + video_json = tmp + break + await asyncio.sleep(wait_time) + proc = await asyncio.create_subprocess_exec(*(await construct_command()), stdin=asyncio.subprocess.PIPE, stdout=log_file, stderr=log_file) except BaseException: tempdir_obj.cleanup() raise