You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
298 lines
14 KiB
298 lines
14 KiB
import re |
|
import os |
|
import sys |
|
import json |
|
import time |
|
import random |
|
import asyncio |
|
import logging |
|
import tempfile |
|
import traceback |
|
from io import BytesIO |
|
from decimal import Decimal |
|
from urllib.parse import urlparse |
|
try: |
|
from yt_dlp import YoutubeDL |
|
except Exception as e: |
|
print(f'Failed to import yt_dlp due to {type(e).__name__}: {e}, importing youtube_dl', file=sys.stderr) |
|
from youtube_dl import YoutubeDL |
|
from . import session, config, client, seen_videos |
|
from .utils import update_seen_videos, get_video_list |
|
from .cappedio import bopen |
|
|
|
tmp_handled = [] |
|
size_limit = 2000 * 1024 * 1024 |
|
live_regex = re.compile(r'error: (?:autoytarchive:([0-9]+) )?(?:this live event will begin|premieres) in .+', re.I) |
|
strip_date = re.compile(r' \d{4}-\d{2}-\d{2} \d{2}:\d{2}$') |
|
ytdl = YoutubeDL({'skip_download': True, 'no_color': True}) |
|
ytdl.add_default_info_extractors() |
|
|
|
async def check_channels(nodl): |
|
if nodl: |
|
await _check_channels(True) |
|
return |
|
while True: |
|
try: |
|
await _check_channels(False) |
|
except BaseException: |
|
logging.exception('Exception encountered with check channels') |
|
try: |
|
with BytesIO(traceback.format_exc().encode()) as file: |
|
file.name = 'check-channels-error.txt' |
|
file.seek(0) |
|
await client.send_message(config['config']['storage_chat_id'], 'Exception encountered with check channels', file=file) |
|
except BaseException: |
|
logging.exception('Exception encountered when sending message to Telegram about check channels exception') |
|
await asyncio.sleep(config['config']['wait_seconds']) |
|
|
|
check_channels_lock = asyncio.Lock() |
|
async def _check_channels(nodl): |
|
async with check_channels_lock: |
|
for i in config['config']['channels']: |
|
logging.info('Checking channel %s', i) |
|
for video_id in await get_video_list(session, i): |
|
if video_id in seen_videos + tmp_handled: |
|
continue |
|
if nodl: |
|
seen_videos.append(video_id) |
|
continue |
|
asyncio.create_task(check_video(video_id)) |
|
tmp_handled.append(video_id) |
|
await asyncio.sleep(random.randint(1, 10)) |
|
|
|
async def check_video(video_id): |
|
for _ in range(5): |
|
try: |
|
return await _check_video(video_id) |
|
except BaseException: |
|
logging.exception('Exception encountered with checking video %s', video_id) |
|
try: |
|
with BytesIO(traceback.format_exc().encode()) as file: |
|
file.name = f'check-videos-error-{video_id}.txt' |
|
file.seek(0) |
|
await client.send_message(config['config']['storage_chat_id'], f'Exception encountered with checking video {video_id}', file=file, parse_mode=None) |
|
except BaseException: |
|
logging.exception('Exception encountered when sending message to Telegram about checking video %s exception', video_id) |
|
|
|
async def _check_video(video_id): |
|
logging.info('Checking video %s', video_id) |
|
first_try_live = waited = False |
|
too_many_requests_count = 1 |
|
while True: |
|
try: |
|
video_json = await client.loop.run_in_executor(None, ytdl.extract_info, f'https://youtube.com/watch?v={video_id}') |
|
except BaseException as e: |
|
wait_time = random.randint(30, 10 * 30) |
|
message = str(e) |
|
if '429' in message or 'too many' in message.lower(): |
|
wait_time = too_many_requests_count * 60 * 60 |
|
too_many_requests_count += 1 |
|
elif match := live_regex.match(message.rstrip('.')): |
|
end_schedule_time = match.group(1) or 0 |
|
if end_schedule_time := int(end_schedule_time): |
|
tmp_wait_time = end_schedule_time - time.time() |
|
if tmp_wait_time > wait_time: |
|
wait_time = tmp_wait_time |
|
await asyncio.sleep(wait_time) |
|
waited = True |
|
else: |
|
if not waited: |
|
first_try_live = True |
|
break |
|
if not video_json.get('is_live'): |
|
first_try_live = False |
|
video_queue.put_nowait((video_json, time.time(), first_try_live)) |
|
|
|
video_queue = asyncio.Queue() |
|
async def video_worker(): |
|
while True: |
|
try: |
|
await _video_worker() |
|
except BaseException: |
|
logging.exception('Exception encountered with video worker') |
|
try: |
|
with BytesIO(traceback.format_exc().encode()) as file: |
|
file.name = 'video-worker-error.txt' |
|
file.seek(0) |
|
await client.send_message(config['config']['storage_chat_id'], 'Exception encountered with video worker', file=file) |
|
except BaseException: |
|
logging.exception('Exception encountered when sending message to Telegram about video worker exception') |
|
|
|
async def _video_worker(): |
|
while True: |
|
video_json, start_time, first_try_live = await video_queue.get() |
|
late_to_queue = (Decimal(time.time()) - Decimal(start_time)) > 5 |
|
is_late = first_try_live or late_to_queue |
|
tempdir_obj = tempfile.TemporaryDirectory(dir='.') |
|
try: |
|
tempdir = tempdir_obj.name |
|
if late_to_queue: |
|
for i in range(5): |
|
wait_time = random.randint(30, 10 * 60) |
|
try: |
|
tmp = await client.loop.run_in_executor(None, ytdl.extract_info, video_json['id']) |
|
except BaseException as e: |
|
e = str(e) |
|
if '429' in e or 'too many request' in e.lower(): |
|
wait_time = (i + 1) * 60 * 60 |
|
else: |
|
video_json = tmp |
|
break |
|
await asyncio.sleep(wait_time) |
|
async def construct_command(): |
|
nonlocal video_json |
|
command = ['ffmpeg', '-y'] |
|
is_manifest = False |
|
if video_json.get('requested_formats'): |
|
for i in video_json['requested_formats']: |
|
if urlparse(i['url']).netloc == 'manifest.googlevideo.com': |
|
is_manifest = True |
|
command.extend(('-i', i['url'])) |
|
else: |
|
is_manifest = urlparse(video_json['url']).netloc == 'manifest.googlevideo.com' |
|
command.extend(('-i', video_json['url'])) |
|
if is_manifest: |
|
await asyncio.sleep(video_json['duration'] + 30) |
|
wait_time = random.randint(30, 10 * 30) |
|
for i in range(5): |
|
try: |
|
tmp = await client.loop.run_in_executor(None, ytdl.extract_info, video_json['id']) |
|
except BaseException as e: |
|
e = str(e) |
|
if '429' in e or 'too many request' in e.lower(): |
|
wait_time = (i + 1) * 60 * 60 |
|
else: |
|
video_json = tmp |
|
break |
|
await asyncio.sleep(wait_time) |
|
return await construct_command() |
|
command.extend(('-c', 'copy', video_filename)) |
|
return command |
|
video_filename = os.path.join(tempdir, video_json['id'] + '.mkv') |
|
with open(video_json['id'] + '.log', 'ab') as log_file: |
|
async def auto_kill(proc): |
|
last_tell = log_file.tell() |
|
while True: |
|
await asyncio.sleep(300) |
|
current_tell = log_file.tell() |
|
if current_tell == last_tell: |
|
break |
|
last_tell = current_tell |
|
proc.stdin.write(b'q') |
|
await proc.stdin.drain() |
|
proc = await asyncio.create_subprocess_exec(*(await construct_command()), stdin=asyncio.subprocess.PIPE, stdout=log_file, stderr=log_file) |
|
text = 'New video' |
|
if is_late: |
|
text += ' (is late)' |
|
video_json['title'] = strip_date.sub('', video_json['title']).strip() |
|
text += f': {video_json["title"]}\nhttps://youtube.com/watch?v={video_json["id"]}' |
|
with BytesIO(json.dumps(video_json, indent=4).encode()) as file: |
|
file.name = video_json['id'] + '.json' |
|
file.seek(0) |
|
await client.send_file(config['config']['storage_chat_id'], file, caption=text, parse_mode=None) |
|
if video_json.get('thumbnail'): |
|
thumbnail_ext = os.path.splitext(urlparse(video_json['thumbnail']).path)[1] |
|
thumbnail_filename = os.path.join(tempdir, video_json['id'] + thumbnail_ext) |
|
async with session.get(video_json['thumbnail']) as resp: |
|
with open(thumbnail_filename, 'wb') as file: |
|
while True: |
|
chunk = await resp.content.read(4096) |
|
if not chunk: |
|
break |
|
file.write(chunk) |
|
await client.send_file(config['config']['storage_chat_id'], thumbnail_filename, caption=text, parse_mode=None) |
|
os.remove(thumbnail_filename) |
|
for _ in range(50): |
|
auto_kill_task = asyncio.create_task(auto_kill(proc)) |
|
await proc.communicate() |
|
auto_kill_task.cancel() |
|
try: |
|
await auto_kill_task |
|
except asyncio.CancelledError: |
|
pass |
|
if not proc.returncode: |
|
break |
|
wait_time = random.randint(30, 10 * 60) |
|
if video_json.get('duration'): |
|
is_manifest = False |
|
if video_json.get('url'): |
|
is_manifest = urlparse(video_json['url']).netloc == 'manifest.googlevideo.com' |
|
if not is_manifest and video_json.get('requested_formats'): |
|
for i in video_json['requested_formats']: |
|
if urlparse(i['url']).netloc == 'manifest.googlevideo.com': |
|
is_manifest = True |
|
break |
|
if is_manifest: |
|
await asyncio.sleep(video_json['duration'] + 30) |
|
try: |
|
await client.send_message(config['config']['storage_chat_id'], f'Failed to download video {video_json["id"]}, please check logs', parse_mode=None) |
|
except BaseException: |
|
logging.exception('Exception encountered when sending message to Telegram about download failure exception') |
|
for i in range(5): |
|
try: |
|
tmp = await client.loop.run_in_executor(None, ytdl.extract_info, video_json['id']) |
|
except BaseException as e: |
|
e = str(e) |
|
if '429' in e or 'too many request' in e.lower(): |
|
wait_time = (i + 1) * 60 * 60 |
|
else: |
|
video_json = tmp |
|
break |
|
await asyncio.sleep(wait_time) |
|
proc = await asyncio.create_subprocess_exec(*(await construct_command()), stdin=asyncio.subprocess.PIPE, stdout=log_file, stderr=log_file) |
|
except BaseException: |
|
tempdir_obj.cleanup() |
|
raise Exception(f'This is a hack to retrieve the video id ({video_json["id"]}), the actual exception should be above') |
|
upload_queue.put_nowait((tempdir_obj, video_json)) |
|
video_queue.task_done() |
|
|
|
upload_queue = asyncio.Queue() |
|
async def upload_worker(): |
|
while True: |
|
try: |
|
await _upload_worker() |
|
except BaseException: |
|
logging.exception('Exception encountered with upload worker') |
|
try: |
|
with BytesIO(traceback.format_exc().encode()) as file: |
|
file.name = 'upload-worker-error.txt' |
|
file.seek(0) |
|
await client.send_message(config['config']['storage_chat_id'], 'Exception encountered with upload worker', file=file) |
|
except BaseException: |
|
logging.exception('Exception encountered when sending message to Telegram about upload worker exception') |
|
|
|
async def _upload_worker(): |
|
while True: |
|
tempdir_obj, video_json = await upload_queue.get() |
|
try: |
|
tempdir = tempdir_obj.name |
|
base_filename = video_json['id'] + '.mkv' |
|
video_filename = os.path.join(tempdir, base_filename) |
|
files_sent = size_sent = 0 |
|
messages = [] |
|
file = bopen(video_filename, None) |
|
file.seek(0, os.SEEK_END) |
|
total_size = file.tell() |
|
is_big = total_size > size_limit |
|
while total_size > 0: |
|
file.seek(size_sent) |
|
file.capped_size = size_limit |
|
if is_big: |
|
file.name = f'{base_filename}.part{str(files_sent).rjust(2, "0")}' |
|
else: |
|
file.name = base_filename |
|
messages.append((await client.send_message(config['config']['storage_chat_id'], f'Uploading {file.name}...', parse_mode=None)).id) |
|
message = await client.send_file(config['config']['storage_chat_id'], file, caption=file.name, parse_mode=None, file_size=size_limit if total_size > size_limit else total_size) |
|
total_size -= message.document.size |
|
if total_size > 0: |
|
size_sent += message.document.size |
|
files_sent += 1 |
|
file.capped_size = None |
|
file.close() |
|
if messages: |
|
await client.delete_messages(config['config']['storage_chat_id'], messages) |
|
finally: |
|
tempdir_obj.cleanup() |
|
seen_videos.append(video_json['id']) |
|
await update_seen_videos() |
|
upload_queue.task_done()
|
|
|