From c23d2fc52e8c7cedac2412909878a8173c8a6301 Mon Sep 17 00:00:00 2001 From: blank X Date: Wed, 7 Apr 2021 15:43:42 +0700 Subject: [PATCH] Stop using unix split --- autoytarchive/cappedio.py | 44 +++++++++++++++++++++++++++++++++++++++ autoytarchive/utils.py | 17 --------------- autoytarchive/workers.py | 34 ++++++++++++++++++++---------- 3 files changed, 67 insertions(+), 28 deletions(-) create mode 100644 autoytarchive/cappedio.py diff --git a/autoytarchive/cappedio.py b/autoytarchive/cappedio.py new file mode 100644 index 0000000..a3db21f --- /dev/null +++ b/autoytarchive/cappedio.py @@ -0,0 +1,44 @@ +import io +import os + +class CappedBufferedReader(io.BufferedReader): + def __init__(self, raw, buffer_size=io.DEFAULT_BUFFER_SIZE, capped_size=None, fake_start=None): + super().__init__(raw, buffer_size) + self.capped_size = capped_size + self.fake_start = fake_start + + def read(self, size=None): + if self.capped_size is not None: + if size is None or size < 0 or size > self.capped_size: + size = self.capped_size + self.capped_size -= size + return super().read(size) + + def seek(self, offset, whence=os.SEEK_SET): + if offset == 0 and whence == os.SEEK_END and self.capped_size is not None: + if self.capped_size < 0: + offset = 0 + else: + real_end = super().seek(0, os.SEEK_END) + if self.capped_size > real_end: + offset = real_end + else: + offset = self.capped_size + whence = os.SEEK_SET + elif whence == os.SEEK_SET and self.fake_start is not None: + offset += self.fake_start + return super().seek(offset, whence) + + @property + def name(self): + try: + return self._name + except AttributeError: + return super().name + + @name.setter + def name(self, new_name): + self._name = new_name + +def bopen(file, capped_size, fake_start): + return CappedBufferedReader(open(file, 'rb', buffering=0), capped_size=capped_size, fake_start=fake_start) diff --git a/autoytarchive/utils.py b/autoytarchive/utils.py index 94a1453..9fd2865 100644 --- a/autoytarchive/utils.py +++ b/autoytarchive/utils.py @@ -1,7 +1,4 @@ -import os import json -import shlex -import asyncio from io import BytesIO from youtube_dl.extractor import youtube from . import config, client, seen_videos @@ -24,20 +21,6 @@ def try_get(src, getter, expected_type=None): return _try_get(src, getter, expected_type) youtube.try_get = try_get -async def split_files(filename, destination_dir): - args = [ - 'split', - '--verbose', - '--numeric-suffixes=0', - '--bytes=2097152000', - '--suffix-length=2', - filename, - os.path.join(destination_dir, os.path.basename(filename)) + '.part' - ] - proc = await asyncio.create_subprocess_exec(*args, stdout=asyncio.subprocess.PIPE) - stdout, _ = await proc.communicate() - return shlex.split(' '.join([i[14:] for i in stdout.decode().strip().split('\n')])) - async def update_seen_videos(): with BytesIO(json.dumps(seen_videos).encode()) as file: file.name = 'autoytarchive.json' diff --git a/autoytarchive/workers.py b/autoytarchive/workers.py index b3dd7ab..949f5c5 100644 --- a/autoytarchive/workers.py +++ b/autoytarchive/workers.py @@ -12,9 +12,11 @@ from decimal import Decimal from urllib.parse import quote as urlencode, urlparse from youtube_dl import YoutubeDL from . import session, config, client, seen_videos -from .utils import split_files, update_seen_videos +from .utils import update_seen_videos +from .cappedio import bopen tmp_handled = [] +size_limit = 2000 * 1024 * 1024 live_regex = re.compile(r'error: (?:autoytarchive:([0-9]+) )?(?:this live event will begin|premieres) in .+', re.I) strip_date = re.compile(r' \d{4}-\d{2}-\d{2} \d{2}:\d{2}$') ytdl = YoutubeDL({'skip_download': True, 'no_color': True}) @@ -219,16 +221,26 @@ async def _upload_worker(): tempdir_obj, video_json = await upload_queue.get() try: tempdir = tempdir_obj.name - video_filename = os.path.join(tempdir, video_json['id'] + '.mkv') - if os.path.getsize(video_filename) > 2000 * 1024 * 1024: - files = await split_files(video_filename, tempdir) - os.remove(video_filename) - else: - files = [video_filename] - for i in files: - message = await client.send_message(config['config']['storage_chat_id'], f'Uploading {os.path.split(i)[1]}...', parse_mode=None) - await client.send_file(config['config']['storage_chat_id'], i, caption=os.path.split(i)[1], parse_mode=None) - asyncio.create_task(message.delete()) + base_filename = video_json['id'] + '.mkv' + video_filename = os.path.join(tempdir, base_filename) + total_size = os.path.getsize(video_filename) + is_big = total_size > size_limit + files_sent = size_sent = 0 + messages = [] + file = bopen(video_filename, size_limit, None) + while total_size > 0: + file.fake_start = size_sent + if is_big: + file.name = f'{base_filename}.part{str(files_sent).rjust(2, "0")}' + messages.append((await client.send_message(config['config']['storage_chat_id'], f'Uploading {file.name}...', parse_mode=None)).id) + await client.send_file(config['config']['storage_chat_id'], file, caption=file.name, parse_mode=None) + total_size -= size_limit + if total_size > 0: + size_sent += size_limit + files_sent += 1 + file = bopen(video_filename, size_limit, None) + if messages: + await client.delete_messages(config['config']['storage_chat_id'], messages) finally: tempdir_obj.cleanup() seen_videos.append(video_json['id'])