Compare commits

..

No commits in common. "c23d2fc52e8c7cedac2412909878a8173c8a6301" and "1fd5ccaeb52c89c55d5c81ac063c43628eb26b06" have entirely different histories.

4 changed files with 85 additions and 124 deletions

View File

@ -1,44 +0,0 @@
import io
import os
class CappedBufferedReader(io.BufferedReader):
def __init__(self, raw, buffer_size=io.DEFAULT_BUFFER_SIZE, capped_size=None, fake_start=None):
super().__init__(raw, buffer_size)
self.capped_size = capped_size
self.fake_start = fake_start
def read(self, size=None):
if self.capped_size is not None:
if size is None or size < 0 or size > self.capped_size:
size = self.capped_size
self.capped_size -= size
return super().read(size)
def seek(self, offset, whence=os.SEEK_SET):
if offset == 0 and whence == os.SEEK_END and self.capped_size is not None:
if self.capped_size < 0:
offset = 0
else:
real_end = super().seek(0, os.SEEK_END)
if self.capped_size > real_end:
offset = real_end
else:
offset = self.capped_size
whence = os.SEEK_SET
elif whence == os.SEEK_SET and self.fake_start is not None:
offset += self.fake_start
return super().seek(offset, whence)
@property
def name(self):
try:
return self._name
except AttributeError:
return super().name
@name.setter
def name(self, new_name):
self._name = new_name
def bopen(file, capped_size, fake_start):
return CappedBufferedReader(open(file, 'rb', buffering=0), capped_size=capped_size, fake_start=fake_start)

View File

@ -1,28 +1,35 @@
import os
import sys
import json import json
import shlex
import asyncio
from io import BytesIO from io import BytesIO
from youtube_dl.extractor import youtube
from . import config, client, seen_videos from . import config, client, seen_videos
youtube._try_get = _try_get = youtube.try_get async def split_files(filename, destination_dir):
def traverse_dict(src): args = [
for (key, value) in src.items(): 'split',
if key == 'scheduledStartTime': '--verbose',
return value '--numeric-suffixes=0',
if isinstance(value, dict): '--bytes=2097152000',
if value := traverse_dict(value): '--suffix-length=2',
return value filename,
return None os.path.join(destination_dir, os.path.basename(filename)) + '.part'
]
def try_get(src, getter, expected_type=None): proc = await asyncio.create_subprocess_exec(*args, stdout=asyncio.subprocess.PIPE)
if reason := src.get('reason'): stdout, _ = await proc.communicate()
if isinstance(reason, str) and (reason.startswith('This live event will begin in ') or reason.startswith('Premieres in ')): return shlex.split(' '.join([i[14:] for i in stdout.decode().strip().split('\n')]))
if t := _try_get(src, traverse_dict, str):
src['reason'] = f'autoytarchive:{t} {reason}'
return _try_get(src, getter, expected_type)
youtube.try_get = try_get
async def update_seen_videos(): async def update_seen_videos():
with BytesIO(json.dumps(seen_videos).encode()) as file: with BytesIO(json.dumps(seen_videos).encode()) as file:
file.name = 'autoytarchive.json' file.name = 'autoytarchive.json'
file.seek(0) file.seek(0)
await client.edit_message(config['config']['storage_chat_id'], config['config']['storage_message_id'], file=file) await client.edit_message(config['config']['storage_chat_id'], config['config']['storage_message_id'], file=file)
async def get_video(video_id):
proc = await asyncio.create_subprocess_exec(sys.executable, 'youtube-dl-injected.py', '--dump-single-json', '--', video_id, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
stdout, stderr = await proc.communicate()
if proc.returncode:
error_message = next(i.strip() for i in stderr.decode().split('\n') if i.startswith('ERROR: '))
return error_message or (proc.returncode, (stderr + stdout).decode())
return json.loads(stdout)

View File

@ -1,4 +1,3 @@
import re
import os import os
import json import json
import time import time
@ -10,17 +9,10 @@ import feedparser
from io import BytesIO from io import BytesIO
from decimal import Decimal from decimal import Decimal
from urllib.parse import quote as urlencode, urlparse from urllib.parse import quote as urlencode, urlparse
from youtube_dl import YoutubeDL
from . import session, config, client, seen_videos from . import session, config, client, seen_videos
from .utils import update_seen_videos from .utils import split_files, update_seen_videos, get_video
from .cappedio import bopen
tmp_handled = [] tmp_handled = []
size_limit = 2000 * 1024 * 1024
live_regex = re.compile(r'error: (?:autoytarchive:([0-9]+) )?(?:this live event will begin|premieres) in .+', re.I)
strip_date = re.compile(r' \d{4}-\d{2}-\d{2} \d{2}:\d{2}$')
ytdl = YoutubeDL({'skip_download': True, 'no_color': True})
ytdl.add_default_info_extractors()
async def check_channels(nodl): async def check_channels(nodl):
if nodl: if nodl:
@ -73,28 +65,30 @@ async def check_video(video):
async def _check_video(video): async def _check_video(video):
logging.info('Checking video %s', video['yt_videoid']) logging.info('Checking video %s', video['yt_videoid'])
first_try_live = waited = False first_try_live = waited = False
too_many_requests_count = 1 tmr_attempts = 1
while True: while True:
try: video_json = await get_video(video['link'])
video_json = await client.loop.run_in_executor(None, ytdl.extract_info, video['link']) if isinstance(video_json, dict):
except BaseException as e:
wait_time = 30
message = str(e)
if '429' in message or 'too many' in message.lower():
wait_time = too_many_requests_count * 60 * 60
too_many_requests_count += 1
elif match := live_regex.match(message.rstrip('.')):
end_schedule_time = match.group(1) or 0
if end_schedule_time := int(end_schedule_time):
tmp_wait_time = end_schedule_time - time.time()
if tmp_wait_time > wait_time:
wait_time = tmp_wait_time
await asyncio.sleep(wait_time)
waited = True
else:
if not waited: if not waited:
first_try_live = True first_try_live = True
break break
wait_time = 30
if isinstance(video_json, str):
error_message = video_json[7:]
if 'too many request' in error_message.lower():
wait_time = tmr_attempts * 60 * 60
tmr_attempts += 1
elif error_message.startswith('AUTOYTARCHIVE:'):
tmp = error_message.split(':', 1)[1].split(' ', 1)[0]
if tmp.isnumeric():
new_wait_time = int(tmp) - int(time.time())
if new_wait_time > 0:
wait_time = new_wait_time
logging.error('Error on video %s: %s', video['yt_videoid'], error_message)
else:
logging.error('Video %s returned status code %s\n%s', video['yt_videoid'], *video_json)
await asyncio.sleep(wait_time)
waited = True
if not video_json.get('is_live'): if not video_json.get('is_live'):
first_try_live = False first_try_live = False
video_queue.put_nowait((video_json, time.time(), first_try_live)) video_queue.put_nowait((video_json, time.time(), first_try_live))
@ -125,16 +119,17 @@ async def _video_worker():
tempdir = tempdir_obj.name tempdir = tempdir_obj.name
if late_to_queue: if late_to_queue:
for i in range(5): for i in range(5):
wait_time = 30 tmp = await get_video(video_json['id'])
try: if isinstance(tmp, dict):
tmp = await client.loop.run_in_executor(None, ytdl.extract_info, video_json['id'])
except BaseException as e:
e = str(e)
if '429' in e or 'too many request' in e.lower():
wait_time = (i + 1) * 60 * 60
else:
video_json = tmp video_json = tmp
break break
wait_time = 30
if isinstance(tmp, str):
if 'too many request' in tmp.lower():
wait_time = (i + 1) * 60 * 60
logging.error('Error on video %s: %s', video_json['id'], tmp)
else:
logging.error('Video %s returned status code %s\n%s', video_json['id'], *video_json)
await asyncio.sleep(wait_time) await asyncio.sleep(wait_time)
if video_json.get('requested_formats'): if video_json.get('requested_formats'):
for i in video_json['requested_formats']: for i in video_json['requested_formats']:
@ -185,15 +180,16 @@ async def _video_worker():
except BaseException: except BaseException:
logging.exception('Exception encountered when sending message to Telegram about download failure exception') logging.exception('Exception encountered when sending message to Telegram about download failure exception')
for i in range(5): for i in range(5):
try: tmp = await get_video(video_json['id'])
tmp = await client.loop.run_in_executor(None, ytdl.extract_info, video_json['id']) if isinstance(tmp, dict):
except BaseException as e:
e = str(e)
if '429' in e or 'too many request' in e.lower():
wait_time = (i + 1) * 60 * 60
else:
video_json = tmp video_json = tmp
break break
if isinstance(tmp, str):
if 'too many request' in tmp.lower():
wait_time = (i + 1) * 60 * 60
logging.error('Error on video %s: %s', video_json['id'], tmp)
else:
logging.error('Video %s returned status code %s\n%s', video_json['id'], *tmp)
await asyncio.sleep(wait_time) await asyncio.sleep(wait_time)
except BaseException: except BaseException:
tempdir_obj.cleanup() tempdir_obj.cleanup()
@ -221,26 +217,16 @@ async def _upload_worker():
tempdir_obj, video_json = await upload_queue.get() tempdir_obj, video_json = await upload_queue.get()
try: try:
tempdir = tempdir_obj.name tempdir = tempdir_obj.name
base_filename = video_json['id'] + '.mkv' video_filename = os.path.join(tempdir, video_json['id'] + '.mkv')
video_filename = os.path.join(tempdir, base_filename) if os.path.getsize(video_filename) > 2000 * 1024 * 1024:
total_size = os.path.getsize(video_filename) files = await split_files(video_filename, tempdir)
is_big = total_size > size_limit os.remove(video_filename)
files_sent = size_sent = 0 else:
messages = [] files = [video_filename]
file = bopen(video_filename, size_limit, None) for i in files:
while total_size > 0: message = await client.send_message(config['config']['storage_chat_id'], f'Uploading {os.path.split(i)[1]}...', parse_mode=None)
file.fake_start = size_sent await client.send_file(config['config']['storage_chat_id'], i, caption=os.path.split(i)[1], parse_mode=None)
if is_big: asyncio.create_task(message.delete())
file.name = f'{base_filename}.part{str(files_sent).rjust(2, "0")}'
messages.append((await client.send_message(config['config']['storage_chat_id'], f'Uploading {file.name}...', parse_mode=None)).id)
await client.send_file(config['config']['storage_chat_id'], file, caption=file.name, parse_mode=None)
total_size -= size_limit
if total_size > 0:
size_sent += size_limit
files_sent += 1
file = bopen(video_filename, size_limit, None)
if messages:
await client.delete_messages(config['config']['storage_chat_id'], messages)
finally: finally:
tempdir_obj.cleanup() tempdir_obj.cleanup()
seen_videos.append(video_json['id']) seen_videos.append(video_json['id'])

12
youtube-dl-injected.py Normal file
View File

@ -0,0 +1,12 @@
from youtube_dl.extractor import youtube
from youtube_dl import main
youtube._try_get = _try_get = youtube.try_get
def try_get(src, getter, expected_type=None):
res = _try_get(src, getter, expected_type)
if isinstance(res, str) and res.startswith('This live event will begin in '):
t = _try_get(src, lambda x: x['playabilityStatus']['liveStreamability']['liveStreamabilityRenderer']['offlineSlate']['liveStreamOfflineSlateRenderer']['scheduledStartTime'], str)
res = f'AUTOYTARCHIVE:{t} {res}'
return res
youtube.try_get = try_get
main()