import os import json import time import asyncio import logging import tempfile import traceback import feedparser from io import BytesIO from decimal import Decimal from urllib.parse import quote as urlencode, urlparse from . import session, config, client, seen_videos from .utils import split_files, update_seen_videos, get_video tmp_handled = [] async def check_channels(nodl): if nodl: await _check_channels(True) return while True: try: await _check_channels(False) except BaseException: logging.exception('Exception encountered with check channels') try: with BytesIO(traceback.format_exc().encode()) as file: file.name = 'check-channels-error.txt' file.seek(0) await client.send_message(config['config']['storage_chat_id'], 'Exception encountered with check channels', file=file) except BaseException: logging.exception('Exception encountered when sending message to Telegram about check channels exception') await asyncio.sleep(config['config']['wait_seconds']) check_channels_lock = asyncio.Lock() async def _check_channels(nodl): async with check_channels_lock: for i in config['config']['channels']: logging.info('Checking channel %s', i) async with session.get(f'https://youtube.com/feeds/videos.xml?channel_id={urlencode(i)}&a={time.time()}') as resp: data = feedparser.parse(await resp.text()) for j in data['items']: if j['yt_videoid'] in seen_videos + tmp_handled: continue if nodl: seen_videos.append(j['yt_videoid']) continue asyncio.create_task(check_video(j)) tmp_handled.append(j['yt_videoid']) async def check_video(video): for _ in range(5): try: return await _check_video(video) except BaseException: logging.exception('Exception encountered with checking video %s', video.get('yt_videoid')) try: with BytesIO(traceback.format_exc().encode()) as file: file.name = f'check-videos-error-{video.get("yt_videoid")}.txt' file.seek(0) await client.send_message(config['config']['storage_chat_id'], f'Exception encountered with checking video {video.get("yt_videoid")}', file=file, parse_mode=None) except BaseException: logging.exception('Exception encountered when sending message to Telegram about checking video %s exception', video.get('yt_videoid')) async def _check_video(video): logging.info('Checking video %s', video['yt_videoid']) first_try_live = waited = False while True: video_json = await get_video(video['link']) if isinstance(video_json, dict): if not waited: first_try_live = True break wait_time = 30 if isinstance(video_json, str): error_message = video_json[7:] if error_message.startswith('AUTOYTARCHIVE:'): tmp = error_message.split(':', 1)[1].split(' ', 1)[0] if tmp.isnumeric(): new_wait_time = int(tmp) - int(time.time()) if new_wait_time > 0: wait_time = new_wait_time logging.error('Error on video %s: %s', video['yt_videoid'], error_message) else: logging.error('Video %s returned status code %s\n%s', video['yt_videoid'], *video_json) await asyncio.sleep(wait_time) waited = True if not video_json.get('is_live'): first_try_live = False video_queue.put_nowait((video_json, time.time(), first_try_live)) video_queue = asyncio.Queue() async def video_worker(): while True: try: await _video_worker() except BaseException: logging.exception('Exception encountered with video worker') try: with BytesIO(traceback.format_exc().encode()) as file: file.name = 'video-worker-error.txt' file.seek(0) await client.send_message(config['config']['storage_chat_id'], 'Exception encountered with video worker', file=file) except BaseException: logging.exception('Exception encountered when sending message to Telegram about video worker exception') async def _video_worker(): while True: video_json, start_time, first_try_live = await video_queue.get() late_to_queue = (Decimal(time.time()) - Decimal(start_time)) > 5 is_late = first_try_live or late_to_queue command = ['ffmpeg', '-y'] tempdir_obj = tempfile.TemporaryDirectory(dir='.') try: tempdir = tempdir_obj.name if late_to_queue: for _ in range(5): tmp = await get_video(video_json['id']) if isinstance(tmp, dict): video_json = tmp break if isinstance(tmp, str): logging.error('Error on video %s: %s', video_json['id'], tmp) else: logging.error('Video %s returned status code %s\n%s', video_json['id'], *video_json) await asyncio.sleep(30) if video_json.get('requested_formats'): for i in video_json['requested_formats']: command.extend(('-i', i['url'])) else: command.extend(('-i', video_json['url'])) video_filename = os.path.join(tempdir, video_json['id'] + '.mkv') command.extend(('-c', 'copy', video_filename)) proc = await asyncio.create_subprocess_exec(*command) text = 'New video' if is_late: text += ' (is late)' text += f': {video_json["title"]}\nhttps://youtube.com/watch?v={video_json["id"]}' with BytesIO(json.dumps(video_json, indent=4).encode()) as file: file.name = video_json['id'] + '.json' file.seek(0) await client.send_file(config['config']['storage_chat_id'], file, caption=text, parse_mode=None) if video_json.get('thumbnail'): thumbnail_ext = os.path.splitext(urlparse(video_json['thumbnail']).path)[1] thumbnail_filename = os.path.join(tempdir, video_json['id'] + thumbnail_ext) async with session.get(video_json['thumbnail']) as resp: with open(thumbnail_filename, 'wb') as file: while True: chunk = await resp.content.read(4096) if not chunk: break file.write(chunk) await client.send_file(config['config']['storage_chat_id'], thumbnail_filename, caption=text, parse_mode=None) os.remove(thumbnail_filename) for _ in range(50): await proc.communicate() if not proc.returncode: break wait_time = 30 if video_json.get('duration'): is_manifest = False if video_json.get('url'): is_manifest = urlparse(video_json['url']).netloc == 'manifest.googlevideo.com' if not is_manifest and video_json.get('requested_formats'): for i in video_json['requested_formats']: if urlparse(i['url']).netloc == 'manifest.googlevideo.com': is_manifest = True break if is_manifest: wait_time += video_json['duration'] try: await client.send_message(config['config']['storage_chat_id'], f'Failed to download video {video_json["id"]}, please check logs', parse_mode=None) except BaseException: logging.exception('Exception encountered when sending message to Telegram about download failure exception') for _ in range(5): tmp = await get_video(video_json['id']) if isinstance(tmp, dict): video_json = tmp break if isinstance(tmp, str): logging.error('Error on video %s: %s', video_json['id'], tmp) else: logging.error('Video %s returned status code %s\n%s', video_json['id'], *tmp) await asyncio.sleep(wait_time) except BaseException: tempdir_obj.cleanup() raise upload_queue.put_nowait((tempdir_obj, video_json)) video_queue.task_done() upload_queue = asyncio.Queue() async def upload_worker(): while True: try: await _upload_worker() except BaseException: logging.exception('Exception encountered with upload worker') try: with BytesIO(traceback.format_exc().encode()) as file: file.name = 'upload-worker-error.txt' file.seek(0) await client.send_message(config['config']['storage_chat_id'], 'Exception encountered with upload worker', file=file) except BaseException: logging.exception('Exception encountered when sending message to Telegram about upload worker exception') async def _upload_worker(): while True: tempdir_obj, video_json = await upload_queue.get() try: tempdir = tempdir_obj.name video_filename = os.path.join(tempdir, video_json['id'] + '.mkv') if os.path.getsize(video_filename) > 2000 * 1024 * 1024: files = await split_files(video_filename, tempdir) os.remove(video_filename) else: files = [video_filename] for i in files: message = await client.send_message(config['config']['storage_chat_id'], f'Uploading {os.path.split(i)[1]}...', parse_mode=None) await client.send_file(config['config']['storage_chat_id'], i, caption=os.path.split(i)[1], parse_mode=None) asyncio.create_task(message.delete()) finally: tempdir_obj.cleanup() seen_videos.append(video_json['id']) await update_seen_videos() upload_queue.task_done()