189 lines
8.5 KiB
Python
189 lines
8.5 KiB
Python
import os
|
|
import sys
|
|
import json
|
|
import time
|
|
import asyncio
|
|
import logging
|
|
import tempfile
|
|
import traceback
|
|
import feedparser
|
|
from io import BytesIO
|
|
from decimal import Decimal
|
|
from urllib.parse import quote as urlencode, urlparse
|
|
from . import session, config, client, seen_videos
|
|
from .utils import split_files, update_seen_videos
|
|
|
|
tmp_handled = []
|
|
|
|
async def check_channels(nodl):
|
|
if nodl:
|
|
await _check_channels(True)
|
|
return
|
|
while True:
|
|
try:
|
|
await _check_channels(False)
|
|
except BaseException:
|
|
logging.exception('Exception encountered with check channels')
|
|
try:
|
|
with BytesIO(traceback.format_exc().encode()) as file:
|
|
file.name = 'check-channels-error.txt'
|
|
file.seek(0)
|
|
await client.send_message(config['config']['storage_chat_id'], 'Exception encountered with check channels', file=file)
|
|
except BaseException:
|
|
logging.exception('Exception encountered when sending message to Telegram about check channels exception')
|
|
await asyncio.sleep(config['config']['wait_seconds'])
|
|
|
|
check_channels_lock = asyncio.Lock()
|
|
async def _check_channels(nodl):
|
|
async with check_channels_lock:
|
|
for i in config['config']['channels']:
|
|
logging.info('Checking channel %s', i)
|
|
async with session.get(f'https://youtube.com/feeds/videos.xml?channel_id={urlencode(i)}&a={time.time()}') as resp:
|
|
data = feedparser.parse(await resp.text())
|
|
for j in data['items']:
|
|
if j['yt_videoid'] in seen_videos + tmp_handled:
|
|
continue
|
|
if nodl:
|
|
seen_videos.append(j['yt_videoid'])
|
|
continue
|
|
asyncio.create_task(check_video(j))
|
|
tmp_handled.append(j['yt_videoid'])
|
|
|
|
async def check_video(video):
|
|
for _ in range(5):
|
|
try:
|
|
return await _check_video(video)
|
|
except BaseException:
|
|
logging.exception('Exception encountered with checking video %s', video.get('yt_videoid'))
|
|
try:
|
|
with BytesIO(traceback.format_exc().encode()) as file:
|
|
file.name = f'check-videos-error-{video.get("yt_videoid")}.txt'
|
|
file.seek(0)
|
|
await client.send_message(config['config']['storage_chat_id'], f'Exception encountered with checking video {video.get("yt_videoid")}', file=file, parse_mode=None)
|
|
except BaseException:
|
|
logging.exception('Exception encountered when sending message to Telegram about checking video %s exception', video.get('yt_videoid'))
|
|
|
|
async def _check_video(video):
|
|
logging.info('Checking video %s', video['yt_videoid'])
|
|
first_try_live = waited = False
|
|
while True:
|
|
proc = await asyncio.create_subprocess_exec(sys.executable, 'youtube-dl-injected.py', '--dump-single-json', video['link'], stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
|
|
stdout, stderr = await proc.communicate()
|
|
if not proc.returncode:
|
|
if not waited:
|
|
first_try_live = True
|
|
break
|
|
wait_time = 30
|
|
error_message = next(i for i in stderr.decode().split('\n') if i.startswith('ERROR: '))
|
|
if error_message:
|
|
error_message = error_message[7:]
|
|
if error_message.startswith('AUTOYTARCHIVE:'):
|
|
tmp = error_message.split(':', 1)[1].split(' ', 1)[0]
|
|
if tmp.isnumeric():
|
|
new_wait_time = int(tmp) - int(time.time())
|
|
if new_wait_time > 0:
|
|
wait_time = new_wait_time
|
|
logging.error('Error on video %s: %s', video['yt_videoid'], error_message)
|
|
else:
|
|
logging.error('Video %s returned status code %s\n%s', video['yt_videoid'], proc.returncode, (stderr + stdout).decode())
|
|
await asyncio.sleep(wait_time)
|
|
waited = True
|
|
video_json = json.loads(stdout)
|
|
if not video_json.get('is_live') and first_try_live:
|
|
first_try_live = False
|
|
video_queue.put_nowait((video_json, time.time(), first_try_live))
|
|
|
|
video_queue = asyncio.Queue()
|
|
async def video_worker():
|
|
while True:
|
|
try:
|
|
await _video_worker()
|
|
except BaseException:
|
|
logging.exception('Exception encountered with video worker')
|
|
try:
|
|
with BytesIO(traceback.format_exc().encode()) as file:
|
|
file.name = 'video-worker-error.txt'
|
|
file.seek(0)
|
|
await client.send_message(config['config']['storage_chat_id'], 'Exception encountered with video worker', file=file)
|
|
except BaseException:
|
|
logging.exception('Exception encountered when sending message to Telegram about video worker exception')
|
|
|
|
async def _video_worker():
|
|
while True:
|
|
video_json, start_time, first_try_live = await video_queue.get()
|
|
is_late = first_try_live or (Decimal(time.time()) - Decimal(start_time)) > 5
|
|
command = ['ffmpeg']
|
|
tempdir_obj = tempfile.TemporaryDirectory(dir='.')
|
|
try:
|
|
tempdir = tempdir_obj.name
|
|
if video_json.get('requested_formats'):
|
|
for i in video_json['requested_formats']:
|
|
command.extend(('-i', i['url']))
|
|
else:
|
|
command.extend(('-i', video_json['url']))
|
|
video_filename = os.path.join(tempdir, video_json['id'] + '.mkv')
|
|
command.extend(('-c', 'copy', video_filename))
|
|
proc = await asyncio.create_subprocess_exec(*command)
|
|
text = 'New video'
|
|
if is_late:
|
|
text += ' (is late)'
|
|
text += f': {video_json["title"]}'
|
|
with BytesIO(json.dumps(video_json).encode()) as file:
|
|
file.name = video_json['id'] + '.json'
|
|
file.seek(0)
|
|
await client.send_file(config['config']['storage_chat_id'], file, caption=text, parse_mode=None)
|
|
if video_json.get('thumbnail'):
|
|
thumbnail_ext = os.path.splitext(urlparse(video_json['thumbnail']).path)[1]
|
|
thumbnail_filename = os.path.join(tempdir, video_json['id'] + thumbnail_ext)
|
|
async with session.get(video_json['thumbnail']) as resp:
|
|
with open(thumbnail_filename, 'wb') as file:
|
|
while True:
|
|
chunk = await resp.content.read(4096)
|
|
if not chunk:
|
|
break
|
|
file.write(chunk)
|
|
await client.send_file(config['config']['storage_chat_id'], thumbnail_filename, caption=text, parse_mode=None)
|
|
os.remove(thumbnail_filename)
|
|
await proc.communicate()
|
|
except BaseException:
|
|
tempdir_obj.cleanup()
|
|
raise
|
|
upload_queue.put_nowait((tempdir_obj, video_json))
|
|
video_queue.task_done()
|
|
|
|
upload_queue = asyncio.Queue()
|
|
async def upload_worker():
|
|
while True:
|
|
try:
|
|
await _upload_worker()
|
|
except BaseException:
|
|
logging.exception('Exception encountered with upload worker')
|
|
try:
|
|
with BytesIO(traceback.format_exc().encode()) as file:
|
|
file.name = 'upload-worker-error.txt'
|
|
file.seek(0)
|
|
await client.send_message(config['config']['storage_chat_id'], 'Exception encountered with upload worker', file=file)
|
|
except BaseException:
|
|
logging.exception('Exception encountered when sending message to Telegram about upload worker exception')
|
|
|
|
async def _upload_worker():
|
|
while True:
|
|
tempdir_obj, video_json = await upload_queue.get()
|
|
try:
|
|
tempdir = tempdir_obj.name
|
|
video_filename = os.path.join(tempdir, video_json['id'] + '.mkv')
|
|
if os.path.getsize(video_filename) > 2000 * 1024 * 1024:
|
|
files = await split_files(video_filename, tempdir)
|
|
os.remove(video_filename)
|
|
else:
|
|
files = [video_filename]
|
|
for i in files:
|
|
message = await client.send_message(config['config']['storage_chat_id'], f'Uploading {os.path.split(i)[1]}...', parse_mode=None)
|
|
await client.send_file(config['config']['storage_chat_id'], i)
|
|
asyncio.create_task(message.delete())
|
|
finally:
|
|
tempdir_obj.cleanup()
|
|
seen_videos.append(video_json['id'])
|
|
await update_seen_videos()
|
|
upload_queue.task_done()
|