Compare commits
2 Commits
1fd5ccaeb5
...
c23d2fc52e
Author | SHA1 | Date |
---|---|---|
blank X | c23d2fc52e | |
blank X | ae4b496a69 |
|
@ -0,0 +1,44 @@
|
||||||
|
import io
|
||||||
|
import os
|
||||||
|
|
||||||
|
class CappedBufferedReader(io.BufferedReader):
|
||||||
|
def __init__(self, raw, buffer_size=io.DEFAULT_BUFFER_SIZE, capped_size=None, fake_start=None):
|
||||||
|
super().__init__(raw, buffer_size)
|
||||||
|
self.capped_size = capped_size
|
||||||
|
self.fake_start = fake_start
|
||||||
|
|
||||||
|
def read(self, size=None):
|
||||||
|
if self.capped_size is not None:
|
||||||
|
if size is None or size < 0 or size > self.capped_size:
|
||||||
|
size = self.capped_size
|
||||||
|
self.capped_size -= size
|
||||||
|
return super().read(size)
|
||||||
|
|
||||||
|
def seek(self, offset, whence=os.SEEK_SET):
|
||||||
|
if offset == 0 and whence == os.SEEK_END and self.capped_size is not None:
|
||||||
|
if self.capped_size < 0:
|
||||||
|
offset = 0
|
||||||
|
else:
|
||||||
|
real_end = super().seek(0, os.SEEK_END)
|
||||||
|
if self.capped_size > real_end:
|
||||||
|
offset = real_end
|
||||||
|
else:
|
||||||
|
offset = self.capped_size
|
||||||
|
whence = os.SEEK_SET
|
||||||
|
elif whence == os.SEEK_SET and self.fake_start is not None:
|
||||||
|
offset += self.fake_start
|
||||||
|
return super().seek(offset, whence)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def name(self):
|
||||||
|
try:
|
||||||
|
return self._name
|
||||||
|
except AttributeError:
|
||||||
|
return super().name
|
||||||
|
|
||||||
|
@name.setter
|
||||||
|
def name(self, new_name):
|
||||||
|
self._name = new_name
|
||||||
|
|
||||||
|
def bopen(file, capped_size, fake_start):
|
||||||
|
return CappedBufferedReader(open(file, 'rb', buffering=0), capped_size=capped_size, fake_start=fake_start)
|
|
@ -1,35 +1,28 @@
|
||||||
import os
|
|
||||||
import sys
|
|
||||||
import json
|
import json
|
||||||
import shlex
|
|
||||||
import asyncio
|
|
||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
|
from youtube_dl.extractor import youtube
|
||||||
from . import config, client, seen_videos
|
from . import config, client, seen_videos
|
||||||
|
|
||||||
async def split_files(filename, destination_dir):
|
youtube._try_get = _try_get = youtube.try_get
|
||||||
args = [
|
def traverse_dict(src):
|
||||||
'split',
|
for (key, value) in src.items():
|
||||||
'--verbose',
|
if key == 'scheduledStartTime':
|
||||||
'--numeric-suffixes=0',
|
return value
|
||||||
'--bytes=2097152000',
|
if isinstance(value, dict):
|
||||||
'--suffix-length=2',
|
if value := traverse_dict(value):
|
||||||
filename,
|
return value
|
||||||
os.path.join(destination_dir, os.path.basename(filename)) + '.part'
|
return None
|
||||||
]
|
|
||||||
proc = await asyncio.create_subprocess_exec(*args, stdout=asyncio.subprocess.PIPE)
|
def try_get(src, getter, expected_type=None):
|
||||||
stdout, _ = await proc.communicate()
|
if reason := src.get('reason'):
|
||||||
return shlex.split(' '.join([i[14:] for i in stdout.decode().strip().split('\n')]))
|
if isinstance(reason, str) and (reason.startswith('This live event will begin in ') or reason.startswith('Premieres in ')):
|
||||||
|
if t := _try_get(src, traverse_dict, str):
|
||||||
|
src['reason'] = f'autoytarchive:{t} {reason}'
|
||||||
|
return _try_get(src, getter, expected_type)
|
||||||
|
youtube.try_get = try_get
|
||||||
|
|
||||||
async def update_seen_videos():
|
async def update_seen_videos():
|
||||||
with BytesIO(json.dumps(seen_videos).encode()) as file:
|
with BytesIO(json.dumps(seen_videos).encode()) as file:
|
||||||
file.name = 'autoytarchive.json'
|
file.name = 'autoytarchive.json'
|
||||||
file.seek(0)
|
file.seek(0)
|
||||||
await client.edit_message(config['config']['storage_chat_id'], config['config']['storage_message_id'], file=file)
|
await client.edit_message(config['config']['storage_chat_id'], config['config']['storage_message_id'], file=file)
|
||||||
|
|
||||||
async def get_video(video_id):
|
|
||||||
proc = await asyncio.create_subprocess_exec(sys.executable, 'youtube-dl-injected.py', '--dump-single-json', '--', video_id, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
|
|
||||||
stdout, stderr = await proc.communicate()
|
|
||||||
if proc.returncode:
|
|
||||||
error_message = next(i.strip() for i in stderr.decode().split('\n') if i.startswith('ERROR: '))
|
|
||||||
return error_message or (proc.returncode, (stderr + stdout).decode())
|
|
||||||
return json.loads(stdout)
|
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
import re
|
||||||
import os
|
import os
|
||||||
import json
|
import json
|
||||||
import time
|
import time
|
||||||
|
@ -9,10 +10,17 @@ import feedparser
|
||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
from decimal import Decimal
|
from decimal import Decimal
|
||||||
from urllib.parse import quote as urlencode, urlparse
|
from urllib.parse import quote as urlencode, urlparse
|
||||||
|
from youtube_dl import YoutubeDL
|
||||||
from . import session, config, client, seen_videos
|
from . import session, config, client, seen_videos
|
||||||
from .utils import split_files, update_seen_videos, get_video
|
from .utils import update_seen_videos
|
||||||
|
from .cappedio import bopen
|
||||||
|
|
||||||
tmp_handled = []
|
tmp_handled = []
|
||||||
|
size_limit = 2000 * 1024 * 1024
|
||||||
|
live_regex = re.compile(r'error: (?:autoytarchive:([0-9]+) )?(?:this live event will begin|premieres) in .+', re.I)
|
||||||
|
strip_date = re.compile(r' \d{4}-\d{2}-\d{2} \d{2}:\d{2}$')
|
||||||
|
ytdl = YoutubeDL({'skip_download': True, 'no_color': True})
|
||||||
|
ytdl.add_default_info_extractors()
|
||||||
|
|
||||||
async def check_channels(nodl):
|
async def check_channels(nodl):
|
||||||
if nodl:
|
if nodl:
|
||||||
|
@ -65,30 +73,28 @@ async def check_video(video):
|
||||||
async def _check_video(video):
|
async def _check_video(video):
|
||||||
logging.info('Checking video %s', video['yt_videoid'])
|
logging.info('Checking video %s', video['yt_videoid'])
|
||||||
first_try_live = waited = False
|
first_try_live = waited = False
|
||||||
tmr_attempts = 1
|
too_many_requests_count = 1
|
||||||
while True:
|
while True:
|
||||||
video_json = await get_video(video['link'])
|
try:
|
||||||
if isinstance(video_json, dict):
|
video_json = await client.loop.run_in_executor(None, ytdl.extract_info, video['link'])
|
||||||
|
except BaseException as e:
|
||||||
|
wait_time = 30
|
||||||
|
message = str(e)
|
||||||
|
if '429' in message or 'too many' in message.lower():
|
||||||
|
wait_time = too_many_requests_count * 60 * 60
|
||||||
|
too_many_requests_count += 1
|
||||||
|
elif match := live_regex.match(message.rstrip('.')):
|
||||||
|
end_schedule_time = match.group(1) or 0
|
||||||
|
if end_schedule_time := int(end_schedule_time):
|
||||||
|
tmp_wait_time = end_schedule_time - time.time()
|
||||||
|
if tmp_wait_time > wait_time:
|
||||||
|
wait_time = tmp_wait_time
|
||||||
|
await asyncio.sleep(wait_time)
|
||||||
|
waited = True
|
||||||
|
else:
|
||||||
if not waited:
|
if not waited:
|
||||||
first_try_live = True
|
first_try_live = True
|
||||||
break
|
break
|
||||||
wait_time = 30
|
|
||||||
if isinstance(video_json, str):
|
|
||||||
error_message = video_json[7:]
|
|
||||||
if 'too many request' in error_message.lower():
|
|
||||||
wait_time = tmr_attempts * 60 * 60
|
|
||||||
tmr_attempts += 1
|
|
||||||
elif error_message.startswith('AUTOYTARCHIVE:'):
|
|
||||||
tmp = error_message.split(':', 1)[1].split(' ', 1)[0]
|
|
||||||
if tmp.isnumeric():
|
|
||||||
new_wait_time = int(tmp) - int(time.time())
|
|
||||||
if new_wait_time > 0:
|
|
||||||
wait_time = new_wait_time
|
|
||||||
logging.error('Error on video %s: %s', video['yt_videoid'], error_message)
|
|
||||||
else:
|
|
||||||
logging.error('Video %s returned status code %s\n%s', video['yt_videoid'], *video_json)
|
|
||||||
await asyncio.sleep(wait_time)
|
|
||||||
waited = True
|
|
||||||
if not video_json.get('is_live'):
|
if not video_json.get('is_live'):
|
||||||
first_try_live = False
|
first_try_live = False
|
||||||
video_queue.put_nowait((video_json, time.time(), first_try_live))
|
video_queue.put_nowait((video_json, time.time(), first_try_live))
|
||||||
|
@ -119,17 +125,16 @@ async def _video_worker():
|
||||||
tempdir = tempdir_obj.name
|
tempdir = tempdir_obj.name
|
||||||
if late_to_queue:
|
if late_to_queue:
|
||||||
for i in range(5):
|
for i in range(5):
|
||||||
tmp = await get_video(video_json['id'])
|
wait_time = 30
|
||||||
if isinstance(tmp, dict):
|
try:
|
||||||
|
tmp = await client.loop.run_in_executor(None, ytdl.extract_info, video_json['id'])
|
||||||
|
except BaseException as e:
|
||||||
|
e = str(e)
|
||||||
|
if '429' in e or 'too many request' in e.lower():
|
||||||
|
wait_time = (i + 1) * 60 * 60
|
||||||
|
else:
|
||||||
video_json = tmp
|
video_json = tmp
|
||||||
break
|
break
|
||||||
wait_time = 30
|
|
||||||
if isinstance(tmp, str):
|
|
||||||
if 'too many request' in tmp.lower():
|
|
||||||
wait_time = (i + 1) * 60 * 60
|
|
||||||
logging.error('Error on video %s: %s', video_json['id'], tmp)
|
|
||||||
else:
|
|
||||||
logging.error('Video %s returned status code %s\n%s', video_json['id'], *video_json)
|
|
||||||
await asyncio.sleep(wait_time)
|
await asyncio.sleep(wait_time)
|
||||||
if video_json.get('requested_formats'):
|
if video_json.get('requested_formats'):
|
||||||
for i in video_json['requested_formats']:
|
for i in video_json['requested_formats']:
|
||||||
|
@ -180,16 +185,15 @@ async def _video_worker():
|
||||||
except BaseException:
|
except BaseException:
|
||||||
logging.exception('Exception encountered when sending message to Telegram about download failure exception')
|
logging.exception('Exception encountered when sending message to Telegram about download failure exception')
|
||||||
for i in range(5):
|
for i in range(5):
|
||||||
tmp = await get_video(video_json['id'])
|
try:
|
||||||
if isinstance(tmp, dict):
|
tmp = await client.loop.run_in_executor(None, ytdl.extract_info, video_json['id'])
|
||||||
|
except BaseException as e:
|
||||||
|
e = str(e)
|
||||||
|
if '429' in e or 'too many request' in e.lower():
|
||||||
|
wait_time = (i + 1) * 60 * 60
|
||||||
|
else:
|
||||||
video_json = tmp
|
video_json = tmp
|
||||||
break
|
break
|
||||||
if isinstance(tmp, str):
|
|
||||||
if 'too many request' in tmp.lower():
|
|
||||||
wait_time = (i + 1) * 60 * 60
|
|
||||||
logging.error('Error on video %s: %s', video_json['id'], tmp)
|
|
||||||
else:
|
|
||||||
logging.error('Video %s returned status code %s\n%s', video_json['id'], *tmp)
|
|
||||||
await asyncio.sleep(wait_time)
|
await asyncio.sleep(wait_time)
|
||||||
except BaseException:
|
except BaseException:
|
||||||
tempdir_obj.cleanup()
|
tempdir_obj.cleanup()
|
||||||
|
@ -217,16 +221,26 @@ async def _upload_worker():
|
||||||
tempdir_obj, video_json = await upload_queue.get()
|
tempdir_obj, video_json = await upload_queue.get()
|
||||||
try:
|
try:
|
||||||
tempdir = tempdir_obj.name
|
tempdir = tempdir_obj.name
|
||||||
video_filename = os.path.join(tempdir, video_json['id'] + '.mkv')
|
base_filename = video_json['id'] + '.mkv'
|
||||||
if os.path.getsize(video_filename) > 2000 * 1024 * 1024:
|
video_filename = os.path.join(tempdir, base_filename)
|
||||||
files = await split_files(video_filename, tempdir)
|
total_size = os.path.getsize(video_filename)
|
||||||
os.remove(video_filename)
|
is_big = total_size > size_limit
|
||||||
else:
|
files_sent = size_sent = 0
|
||||||
files = [video_filename]
|
messages = []
|
||||||
for i in files:
|
file = bopen(video_filename, size_limit, None)
|
||||||
message = await client.send_message(config['config']['storage_chat_id'], f'Uploading {os.path.split(i)[1]}...', parse_mode=None)
|
while total_size > 0:
|
||||||
await client.send_file(config['config']['storage_chat_id'], i, caption=os.path.split(i)[1], parse_mode=None)
|
file.fake_start = size_sent
|
||||||
asyncio.create_task(message.delete())
|
if is_big:
|
||||||
|
file.name = f'{base_filename}.part{str(files_sent).rjust(2, "0")}'
|
||||||
|
messages.append((await client.send_message(config['config']['storage_chat_id'], f'Uploading {file.name}...', parse_mode=None)).id)
|
||||||
|
await client.send_file(config['config']['storage_chat_id'], file, caption=file.name, parse_mode=None)
|
||||||
|
total_size -= size_limit
|
||||||
|
if total_size > 0:
|
||||||
|
size_sent += size_limit
|
||||||
|
files_sent += 1
|
||||||
|
file = bopen(video_filename, size_limit, None)
|
||||||
|
if messages:
|
||||||
|
await client.delete_messages(config['config']['storage_chat_id'], messages)
|
||||||
finally:
|
finally:
|
||||||
tempdir_obj.cleanup()
|
tempdir_obj.cleanup()
|
||||||
seen_videos.append(video_json['id'])
|
seen_videos.append(video_json['id'])
|
||||||
|
|
|
@ -1,12 +0,0 @@
|
||||||
from youtube_dl.extractor import youtube
|
|
||||||
from youtube_dl import main
|
|
||||||
|
|
||||||
youtube._try_get = _try_get = youtube.try_get
|
|
||||||
def try_get(src, getter, expected_type=None):
|
|
||||||
res = _try_get(src, getter, expected_type)
|
|
||||||
if isinstance(res, str) and res.startswith('This live event will begin in '):
|
|
||||||
t = _try_get(src, lambda x: x['playabilityStatus']['liveStreamability']['liveStreamabilityRenderer']['offlineSlate']['liveStreamOfflineSlateRenderer']['scheduledStartTime'], str)
|
|
||||||
res = f'AUTOYTARCHIVE:{t} {res}'
|
|
||||||
return res
|
|
||||||
youtube.try_get = try_get
|
|
||||||
main()
|
|
Loading…
Reference in New Issue