Compare commits

...

2 Commits

Author SHA1 Message Date
blank X c23d2fc52e
Stop using unix split 2021-04-07 15:43:42 +07:00
blank X ae4b496a69
Use some code from ytnotifier 2021-04-07 14:21:34 +07:00
4 changed files with 124 additions and 85 deletions

44
autoytarchive/cappedio.py Normal file
View File

@ -0,0 +1,44 @@
import io
import os
class CappedBufferedReader(io.BufferedReader):
def __init__(self, raw, buffer_size=io.DEFAULT_BUFFER_SIZE, capped_size=None, fake_start=None):
super().__init__(raw, buffer_size)
self.capped_size = capped_size
self.fake_start = fake_start
def read(self, size=None):
if self.capped_size is not None:
if size is None or size < 0 or size > self.capped_size:
size = self.capped_size
self.capped_size -= size
return super().read(size)
def seek(self, offset, whence=os.SEEK_SET):
if offset == 0 and whence == os.SEEK_END and self.capped_size is not None:
if self.capped_size < 0:
offset = 0
else:
real_end = super().seek(0, os.SEEK_END)
if self.capped_size > real_end:
offset = real_end
else:
offset = self.capped_size
whence = os.SEEK_SET
elif whence == os.SEEK_SET and self.fake_start is not None:
offset += self.fake_start
return super().seek(offset, whence)
@property
def name(self):
try:
return self._name
except AttributeError:
return super().name
@name.setter
def name(self, new_name):
self._name = new_name
def bopen(file, capped_size, fake_start):
return CappedBufferedReader(open(file, 'rb', buffering=0), capped_size=capped_size, fake_start=fake_start)

View File

@ -1,35 +1,28 @@
import os
import sys
import json import json
import shlex
import asyncio
from io import BytesIO from io import BytesIO
from youtube_dl.extractor import youtube
from . import config, client, seen_videos from . import config, client, seen_videos
async def split_files(filename, destination_dir): youtube._try_get = _try_get = youtube.try_get
args = [ def traverse_dict(src):
'split', for (key, value) in src.items():
'--verbose', if key == 'scheduledStartTime':
'--numeric-suffixes=0', return value
'--bytes=2097152000', if isinstance(value, dict):
'--suffix-length=2', if value := traverse_dict(value):
filename, return value
os.path.join(destination_dir, os.path.basename(filename)) + '.part' return None
]
proc = await asyncio.create_subprocess_exec(*args, stdout=asyncio.subprocess.PIPE) def try_get(src, getter, expected_type=None):
stdout, _ = await proc.communicate() if reason := src.get('reason'):
return shlex.split(' '.join([i[14:] for i in stdout.decode().strip().split('\n')])) if isinstance(reason, str) and (reason.startswith('This live event will begin in ') or reason.startswith('Premieres in ')):
if t := _try_get(src, traverse_dict, str):
src['reason'] = f'autoytarchive:{t} {reason}'
return _try_get(src, getter, expected_type)
youtube.try_get = try_get
async def update_seen_videos(): async def update_seen_videos():
with BytesIO(json.dumps(seen_videos).encode()) as file: with BytesIO(json.dumps(seen_videos).encode()) as file:
file.name = 'autoytarchive.json' file.name = 'autoytarchive.json'
file.seek(0) file.seek(0)
await client.edit_message(config['config']['storage_chat_id'], config['config']['storage_message_id'], file=file) await client.edit_message(config['config']['storage_chat_id'], config['config']['storage_message_id'], file=file)
async def get_video(video_id):
proc = await asyncio.create_subprocess_exec(sys.executable, 'youtube-dl-injected.py', '--dump-single-json', '--', video_id, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
stdout, stderr = await proc.communicate()
if proc.returncode:
error_message = next(i.strip() for i in stderr.decode().split('\n') if i.startswith('ERROR: '))
return error_message or (proc.returncode, (stderr + stdout).decode())
return json.loads(stdout)

View File

@ -1,3 +1,4 @@
import re
import os import os
import json import json
import time import time
@ -9,10 +10,17 @@ import feedparser
from io import BytesIO from io import BytesIO
from decimal import Decimal from decimal import Decimal
from urllib.parse import quote as urlencode, urlparse from urllib.parse import quote as urlencode, urlparse
from youtube_dl import YoutubeDL
from . import session, config, client, seen_videos from . import session, config, client, seen_videos
from .utils import split_files, update_seen_videos, get_video from .utils import update_seen_videos
from .cappedio import bopen
tmp_handled = [] tmp_handled = []
size_limit = 2000 * 1024 * 1024
live_regex = re.compile(r'error: (?:autoytarchive:([0-9]+) )?(?:this live event will begin|premieres) in .+', re.I)
strip_date = re.compile(r' \d{4}-\d{2}-\d{2} \d{2}:\d{2}$')
ytdl = YoutubeDL({'skip_download': True, 'no_color': True})
ytdl.add_default_info_extractors()
async def check_channels(nodl): async def check_channels(nodl):
if nodl: if nodl:
@ -65,30 +73,28 @@ async def check_video(video):
async def _check_video(video): async def _check_video(video):
logging.info('Checking video %s', video['yt_videoid']) logging.info('Checking video %s', video['yt_videoid'])
first_try_live = waited = False first_try_live = waited = False
tmr_attempts = 1 too_many_requests_count = 1
while True: while True:
video_json = await get_video(video['link']) try:
if isinstance(video_json, dict): video_json = await client.loop.run_in_executor(None, ytdl.extract_info, video['link'])
except BaseException as e:
wait_time = 30
message = str(e)
if '429' in message or 'too many' in message.lower():
wait_time = too_many_requests_count * 60 * 60
too_many_requests_count += 1
elif match := live_regex.match(message.rstrip('.')):
end_schedule_time = match.group(1) or 0
if end_schedule_time := int(end_schedule_time):
tmp_wait_time = end_schedule_time - time.time()
if tmp_wait_time > wait_time:
wait_time = tmp_wait_time
await asyncio.sleep(wait_time)
waited = True
else:
if not waited: if not waited:
first_try_live = True first_try_live = True
break break
wait_time = 30
if isinstance(video_json, str):
error_message = video_json[7:]
if 'too many request' in error_message.lower():
wait_time = tmr_attempts * 60 * 60
tmr_attempts += 1
elif error_message.startswith('AUTOYTARCHIVE:'):
tmp = error_message.split(':', 1)[1].split(' ', 1)[0]
if tmp.isnumeric():
new_wait_time = int(tmp) - int(time.time())
if new_wait_time > 0:
wait_time = new_wait_time
logging.error('Error on video %s: %s', video['yt_videoid'], error_message)
else:
logging.error('Video %s returned status code %s\n%s', video['yt_videoid'], *video_json)
await asyncio.sleep(wait_time)
waited = True
if not video_json.get('is_live'): if not video_json.get('is_live'):
first_try_live = False first_try_live = False
video_queue.put_nowait((video_json, time.time(), first_try_live)) video_queue.put_nowait((video_json, time.time(), first_try_live))
@ -119,17 +125,16 @@ async def _video_worker():
tempdir = tempdir_obj.name tempdir = tempdir_obj.name
if late_to_queue: if late_to_queue:
for i in range(5): for i in range(5):
tmp = await get_video(video_json['id']) wait_time = 30
if isinstance(tmp, dict): try:
tmp = await client.loop.run_in_executor(None, ytdl.extract_info, video_json['id'])
except BaseException as e:
e = str(e)
if '429' in e or 'too many request' in e.lower():
wait_time = (i + 1) * 60 * 60
else:
video_json = tmp video_json = tmp
break break
wait_time = 30
if isinstance(tmp, str):
if 'too many request' in tmp.lower():
wait_time = (i + 1) * 60 * 60
logging.error('Error on video %s: %s', video_json['id'], tmp)
else:
logging.error('Video %s returned status code %s\n%s', video_json['id'], *video_json)
await asyncio.sleep(wait_time) await asyncio.sleep(wait_time)
if video_json.get('requested_formats'): if video_json.get('requested_formats'):
for i in video_json['requested_formats']: for i in video_json['requested_formats']:
@ -180,16 +185,15 @@ async def _video_worker():
except BaseException: except BaseException:
logging.exception('Exception encountered when sending message to Telegram about download failure exception') logging.exception('Exception encountered when sending message to Telegram about download failure exception')
for i in range(5): for i in range(5):
tmp = await get_video(video_json['id']) try:
if isinstance(tmp, dict): tmp = await client.loop.run_in_executor(None, ytdl.extract_info, video_json['id'])
except BaseException as e:
e = str(e)
if '429' in e or 'too many request' in e.lower():
wait_time = (i + 1) * 60 * 60
else:
video_json = tmp video_json = tmp
break break
if isinstance(tmp, str):
if 'too many request' in tmp.lower():
wait_time = (i + 1) * 60 * 60
logging.error('Error on video %s: %s', video_json['id'], tmp)
else:
logging.error('Video %s returned status code %s\n%s', video_json['id'], *tmp)
await asyncio.sleep(wait_time) await asyncio.sleep(wait_time)
except BaseException: except BaseException:
tempdir_obj.cleanup() tempdir_obj.cleanup()
@ -217,16 +221,26 @@ async def _upload_worker():
tempdir_obj, video_json = await upload_queue.get() tempdir_obj, video_json = await upload_queue.get()
try: try:
tempdir = tempdir_obj.name tempdir = tempdir_obj.name
video_filename = os.path.join(tempdir, video_json['id'] + '.mkv') base_filename = video_json['id'] + '.mkv'
if os.path.getsize(video_filename) > 2000 * 1024 * 1024: video_filename = os.path.join(tempdir, base_filename)
files = await split_files(video_filename, tempdir) total_size = os.path.getsize(video_filename)
os.remove(video_filename) is_big = total_size > size_limit
else: files_sent = size_sent = 0
files = [video_filename] messages = []
for i in files: file = bopen(video_filename, size_limit, None)
message = await client.send_message(config['config']['storage_chat_id'], f'Uploading {os.path.split(i)[1]}...', parse_mode=None) while total_size > 0:
await client.send_file(config['config']['storage_chat_id'], i, caption=os.path.split(i)[1], parse_mode=None) file.fake_start = size_sent
asyncio.create_task(message.delete()) if is_big:
file.name = f'{base_filename}.part{str(files_sent).rjust(2, "0")}'
messages.append((await client.send_message(config['config']['storage_chat_id'], f'Uploading {file.name}...', parse_mode=None)).id)
await client.send_file(config['config']['storage_chat_id'], file, caption=file.name, parse_mode=None)
total_size -= size_limit
if total_size > 0:
size_sent += size_limit
files_sent += 1
file = bopen(video_filename, size_limit, None)
if messages:
await client.delete_messages(config['config']['storage_chat_id'], messages)
finally: finally:
tempdir_obj.cleanup() tempdir_obj.cleanup()
seen_videos.append(video_json['id']) seen_videos.append(video_json['id'])

View File

@ -1,12 +0,0 @@
from youtube_dl.extractor import youtube
from youtube_dl import main
youtube._try_get = _try_get = youtube.try_get
def try_get(src, getter, expected_type=None):
res = _try_get(src, getter, expected_type)
if isinstance(res, str) and res.startswith('This live event will begin in '):
t = _try_get(src, lambda x: x['playabilityStatus']['liveStreamability']['liveStreamabilityRenderer']['offlineSlate']['liveStreamOfflineSlateRenderer']['scheduledStartTime'], str)
res = f'AUTOYTARCHIVE:{t} {res}'
return res
youtube.try_get = try_get
main()