You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
299 lines
14 KiB
299 lines
14 KiB
import re
|
|
import os
|
|
import sys
|
|
import json
|
|
import time
|
|
import random
|
|
import asyncio
|
|
import logging
|
|
import tempfile
|
|
import traceback
|
|
from io import BytesIO
|
|
from decimal import Decimal
|
|
from urllib.parse import urlparse
|
|
try:
|
|
from yt_dlp import YoutubeDL
|
|
except Exception as e:
|
|
print(f'Failed to import yt_dlp due to {type(e).__name__}: {e}, importing youtube_dl', file=sys.stderr)
|
|
from youtube_dl import YoutubeDL
|
|
from . import session, config, client, seen_videos
|
|
from .utils import update_seen_videos, get_video_list
|
|
from .cappedio import bopen
|
|
|
|
tmp_handled = []
|
|
size_limit = 2000 * 1024 * 1024
|
|
live_regex = re.compile(r'error: (?:autoytarchive:([0-9]+) )?(?:this live event will begin|premieres) in .+', re.I)
|
|
strip_date = re.compile(r' \d{4}-\d{2}-\d{2} \d{2}:\d{2}$')
|
|
ytdl = YoutubeDL({'skip_download': True, 'no_color': True})
|
|
ytdl.add_default_info_extractors()
|
|
|
|
async def check_channels(nodl):
|
|
if nodl:
|
|
await _check_channels(True)
|
|
return
|
|
while True:
|
|
try:
|
|
await _check_channels(False)
|
|
except BaseException:
|
|
logging.exception('Exception encountered with check channels')
|
|
try:
|
|
with BytesIO(traceback.format_exc().encode()) as file:
|
|
file.name = 'check-channels-error.txt'
|
|
file.seek(0)
|
|
await client.send_message(config['config']['storage_chat_id'], 'Exception encountered with check channels', file=file)
|
|
except BaseException:
|
|
logging.exception('Exception encountered when sending message to Telegram about check channels exception')
|
|
await asyncio.sleep(config['config']['wait_seconds'])
|
|
|
|
check_channels_lock = asyncio.Lock()
|
|
async def _check_channels(nodl):
|
|
async with check_channels_lock:
|
|
for i in config['config']['channels']:
|
|
logging.info('Checking channel %s', i)
|
|
for video_id in await get_video_list(session, i):
|
|
if video_id in seen_videos + tmp_handled:
|
|
continue
|
|
if nodl:
|
|
seen_videos.append(video_id)
|
|
continue
|
|
asyncio.create_task(check_video(video_id))
|
|
tmp_handled.append(video_id)
|
|
await asyncio.sleep(random.randint(1, 10))
|
|
|
|
async def check_video(video_id):
|
|
for _ in range(5):
|
|
try:
|
|
return await _check_video(video_id)
|
|
except BaseException:
|
|
logging.exception('Exception encountered with checking video %s', video_id)
|
|
try:
|
|
with BytesIO(traceback.format_exc().encode()) as file:
|
|
file.name = f'check-videos-error-{video_id}.txt'
|
|
file.seek(0)
|
|
await client.send_message(config['config']['storage_chat_id'], f'Exception encountered with checking video {video_id}', file=file, parse_mode=None)
|
|
except BaseException:
|
|
logging.exception('Exception encountered when sending message to Telegram about checking video %s exception', video_id)
|
|
|
|
async def _check_video(video_id):
|
|
logging.info('Checking video %s', video_id)
|
|
first_try_live = waited = False
|
|
too_many_requests_count = 1
|
|
while True:
|
|
try:
|
|
video_json = await client.loop.run_in_executor(None, ytdl.extract_info, f'https://youtube.com/watch?v={video_id}')
|
|
except BaseException as e:
|
|
wait_time = random.randint(30, 10 * 30)
|
|
message = str(e)
|
|
if '429' in message or 'too many' in message.lower():
|
|
wait_time = too_many_requests_count * 60 * 60
|
|
too_many_requests_count += 1
|
|
elif match := live_regex.match(message.rstrip('.')):
|
|
end_schedule_time = match.group(1) or 0
|
|
if end_schedule_time := int(end_schedule_time):
|
|
tmp_wait_time = end_schedule_time - time.time()
|
|
if tmp_wait_time > wait_time:
|
|
wait_time = tmp_wait_time
|
|
await asyncio.sleep(wait_time)
|
|
waited = True
|
|
else:
|
|
if not waited:
|
|
first_try_live = True
|
|
break
|
|
if not video_json.get('is_live'):
|
|
first_try_live = False
|
|
video_queue.put_nowait((video_json, time.time(), first_try_live))
|
|
|
|
video_queue = asyncio.Queue()
|
|
async def video_worker():
|
|
while True:
|
|
try:
|
|
await _video_worker()
|
|
except BaseException:
|
|
logging.exception('Exception encountered with video worker')
|
|
try:
|
|
with BytesIO(traceback.format_exc().encode()) as file:
|
|
file.name = 'video-worker-error.txt'
|
|
file.seek(0)
|
|
await client.send_message(config['config']['storage_chat_id'], 'Exception encountered with video worker', file=file)
|
|
except BaseException:
|
|
logging.exception('Exception encountered when sending message to Telegram about video worker exception')
|
|
|
|
async def _video_worker():
|
|
while True:
|
|
video_json, start_time, first_try_live = await video_queue.get()
|
|
late_to_queue = (Decimal(time.time()) - Decimal(start_time)) > 5
|
|
is_late = first_try_live or late_to_queue
|
|
tempdir_obj = tempfile.TemporaryDirectory(dir='.')
|
|
try:
|
|
tempdir = tempdir_obj.name
|
|
if late_to_queue:
|
|
for i in range(5):
|
|
wait_time = random.randint(30, 10 * 60)
|
|
try:
|
|
tmp = await client.loop.run_in_executor(None, ytdl.extract_info, video_json['id'])
|
|
except BaseException as e:
|
|
e = str(e)
|
|
if '429' in e or 'too many request' in e.lower():
|
|
wait_time = (i + 1) * 60 * 60
|
|
else:
|
|
video_json = tmp
|
|
break
|
|
await asyncio.sleep(wait_time)
|
|
async def construct_command():
|
|
nonlocal video_json
|
|
command = ['ffmpeg', '-y']
|
|
is_manifest = False
|
|
if video_json.get('requested_formats'):
|
|
for i in video_json['requested_formats']:
|
|
if urlparse(i['url']).netloc == 'manifest.googlevideo.com':
|
|
is_manifest = True
|
|
command.extend(('-i', i['url']))
|
|
else:
|
|
is_manifest = urlparse(video_json['url']).netloc == 'manifest.googlevideo.com'
|
|
command.extend(('-i', video_json['url']))
|
|
if is_manifest:
|
|
await asyncio.sleep(video_json['duration'] + 30)
|
|
wait_time = random.randint(30, 10 * 30)
|
|
for i in range(5):
|
|
try:
|
|
tmp = await client.loop.run_in_executor(None, ytdl.extract_info, video_json['id'])
|
|
except BaseException as e:
|
|
e = str(e)
|
|
if '429' in e or 'too many request' in e.lower():
|
|
wait_time = (i + 1) * 60 * 60
|
|
else:
|
|
video_json = tmp
|
|
break
|
|
await asyncio.sleep(wait_time)
|
|
return await construct_command()
|
|
command.extend(('-c', 'copy', video_filename))
|
|
return command
|
|
video_filename = os.path.join(tempdir, video_json['id'] + '.mkv')
|
|
with open(video_json['id'] + '.log', 'ab') as log_file:
|
|
async def auto_kill(proc):
|
|
last_tell = log_file.tell()
|
|
while True:
|
|
await asyncio.sleep(300)
|
|
current_tell = log_file.tell()
|
|
if current_tell == last_tell:
|
|
break
|
|
last_tell = current_tell
|
|
proc.stdin.write(b'q')
|
|
await proc.stdin.drain()
|
|
proc = await asyncio.create_subprocess_exec(*(await construct_command()), stdin=asyncio.subprocess.PIPE, stdout=log_file, stderr=log_file)
|
|
text = 'New video'
|
|
if is_late:
|
|
text += ' (is late)'
|
|
video_json['title'] = strip_date.sub('', video_json['title']).strip()
|
|
text += f': {video_json["title"]}\nhttps://youtube.com/watch?v={video_json["id"]}'
|
|
with BytesIO(json.dumps(video_json, indent=4).encode()) as file:
|
|
file.name = video_json['id'] + '.json'
|
|
file.seek(0)
|
|
await client.send_file(config['config']['storage_chat_id'], file, caption=text, parse_mode=None)
|
|
if video_json.get('thumbnail'):
|
|
thumbnail_ext = os.path.splitext(urlparse(video_json['thumbnail']).path)[1]
|
|
thumbnail_filename = os.path.join(tempdir, video_json['id'] + thumbnail_ext)
|
|
async with session.get(video_json['thumbnail']) as resp:
|
|
with open(thumbnail_filename, 'wb') as file:
|
|
while True:
|
|
chunk = await resp.content.read(4096)
|
|
if not chunk:
|
|
break
|
|
file.write(chunk)
|
|
await client.send_file(config['config']['storage_chat_id'], thumbnail_filename, caption=text, parse_mode=None)
|
|
os.remove(thumbnail_filename)
|
|
for _ in range(50):
|
|
auto_kill_task = asyncio.create_task(auto_kill(proc))
|
|
await proc.communicate()
|
|
auto_kill_task.cancel()
|
|
try:
|
|
await auto_kill_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
if not proc.returncode:
|
|
break
|
|
wait_time = random.randint(30, 10 * 60)
|
|
if video_json.get('duration'):
|
|
is_manifest = False
|
|
if video_json.get('url'):
|
|
is_manifest = urlparse(video_json['url']).netloc == 'manifest.googlevideo.com'
|
|
if not is_manifest and video_json.get('requested_formats'):
|
|
for i in video_json['requested_formats']:
|
|
if urlparse(i['url']).netloc == 'manifest.googlevideo.com':
|
|
is_manifest = True
|
|
break
|
|
if is_manifest:
|
|
await asyncio.sleep(video_json['duration'] + 30)
|
|
try:
|
|
await client.send_message(config['config']['storage_chat_id'], f'Failed to download video {video_json["id"]}, please check logs', parse_mode=None)
|
|
except BaseException:
|
|
logging.exception('Exception encountered when sending message to Telegram about download failure exception')
|
|
for i in range(5):
|
|
try:
|
|
tmp = await client.loop.run_in_executor(None, ytdl.extract_info, video_json['id'])
|
|
except BaseException as e:
|
|
e = str(e)
|
|
if '429' in e or 'too many request' in e.lower():
|
|
wait_time = (i + 1) * 60 * 60
|
|
else:
|
|
video_json = tmp
|
|
break
|
|
await asyncio.sleep(wait_time)
|
|
proc = await asyncio.create_subprocess_exec(*(await construct_command()), stdin=asyncio.subprocess.PIPE, stdout=log_file, stderr=log_file)
|
|
except BaseException:
|
|
tempdir_obj.cleanup()
|
|
raise Exception(f'This is a hack to retrieve the video id ({video_json["id"]}), the actual exception should be above')
|
|
upload_queue.put_nowait((tempdir_obj, video_json))
|
|
video_queue.task_done()
|
|
|
|
upload_queue = asyncio.Queue()
|
|
async def upload_worker():
|
|
while True:
|
|
try:
|
|
await _upload_worker()
|
|
except BaseException:
|
|
logging.exception('Exception encountered with upload worker')
|
|
try:
|
|
with BytesIO(traceback.format_exc().encode()) as file:
|
|
file.name = 'upload-worker-error.txt'
|
|
file.seek(0)
|
|
await client.send_message(config['config']['storage_chat_id'], 'Exception encountered with upload worker', file=file)
|
|
except BaseException:
|
|
logging.exception('Exception encountered when sending message to Telegram about upload worker exception')
|
|
|
|
async def _upload_worker():
|
|
while True:
|
|
tempdir_obj, video_json = await upload_queue.get()
|
|
try:
|
|
tempdir = tempdir_obj.name
|
|
base_filename = video_json['id'] + '.mkv'
|
|
video_filename = os.path.join(tempdir, base_filename)
|
|
files_sent = size_sent = 0
|
|
messages = []
|
|
file = bopen(video_filename, None)
|
|
file.seek(0, os.SEEK_END)
|
|
total_size = file.tell()
|
|
is_big = total_size > size_limit
|
|
while total_size > 0:
|
|
file.seek(size_sent)
|
|
file.capped_size = size_limit
|
|
if is_big:
|
|
file.name = f'{base_filename}.part{str(files_sent).rjust(2, "0")}'
|
|
else:
|
|
file.name = base_filename
|
|
messages.append((await client.send_message(config['config']['storage_chat_id'], f'Uploading {file.name}...', parse_mode=None)).id)
|
|
message = await client.send_file(config['config']['storage_chat_id'], file, caption=file.name, parse_mode=None, file_size=size_limit if total_size > size_limit else total_size)
|
|
total_size -= message.document.size
|
|
if total_size > 0:
|
|
size_sent += message.document.size
|
|
files_sent += 1
|
|
file.capped_size = None
|
|
file.close()
|
|
if messages:
|
|
await client.delete_messages(config['config']['storage_chat_id'], messages)
|
|
finally:
|
|
tempdir_obj.cleanup()
|
|
seen_videos.append(video_json['id'])
|
|
await update_seen_videos()
|
|
upload_queue.task_done()
|