
commit
85e9d7595d
10 changed files with 322 additions and 0 deletions
@ -0,0 +1,21 @@
|
||||
MIT License |
||||
|
||||
Copyright (c) 2021 blank X |
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy |
||||
of this software and associated documentation files (the "Software"), to deal |
||||
in the Software without restriction, including without limitation the rights |
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
||||
copies of the Software, and to permit persons to whom the Software is |
||||
furnished to do so, subject to the following conditions: |
||||
|
||||
The above copyright notice and this permission notice shall be included in all |
||||
copies or substantial portions of the Software. |
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE |
||||
SOFTWARE. |
@ -0,0 +1,14 @@
|
||||
# AutoYTArchive |
||||
|
||||
Automatically archives YouTube videos and streams to Telegram |
||||
|
||||
### Installation Instructions |
||||
1. Install: |
||||
- `python3` (this is in python after all) |
||||
- `ffmpeg` (to download) |
||||
2. `pip3 install -r requirements.txt` |
||||
3. Copy example-config.yaml to config.yaml and edit it |
||||
4. `python3 -m autoytarchive nodl` |
||||
|
||||
### Start |
||||
`python3 -m autoytarchive` |
@ -0,0 +1,13 @@
|
||||
import logging |
||||
logging.basicConfig(level=logging.INFO) |
||||
|
||||
import yaml |
||||
import aiohttp |
||||
from telethon import TelegramClient |
||||
|
||||
with open('config.yaml') as file: |
||||
config = yaml.safe_load(file) |
||||
|
||||
session = aiohttp.ClientSession() |
||||
client = TelegramClient('autoytarchive', config['telegram']['api_id'], config['telegram']['api_hash']) |
||||
seen_videos = [] |
@ -0,0 +1,31 @@
|
||||
import sys |
||||
import json |
||||
import asyncio |
||||
import logging |
||||
from . import config, client, seen_videos |
||||
from .workers import check_channels, video_worker, upload_worker |
||||
from .utils import update_seen_videos |
||||
|
||||
async def main(nodl): |
||||
await client.start(bot_token=config['telegram']['bot_token']) |
||||
try: |
||||
message = await client.get_messages(config['config']['storage_chat_id'], ids=config['config']['storage_message_id']) |
||||
resp = await message.download_media(bytes) |
||||
seen_videos.extend(json.loads(resp)) |
||||
except BaseException: |
||||
logging.exception('Exception encountered when downloading seen videos') |
||||
if nodl: |
||||
await check_channels(True) |
||||
await update_seen_videos() |
||||
await client.disconnect() |
||||
else: |
||||
await asyncio.gather(check_channels(False), video_worker(), upload_worker()) |
||||
await client.disconnect() |
||||
|
||||
if len(sys.argv) not in (1, 2): |
||||
print('Usage:', sys.executable, '-m', __package__, '[nodl]', file=sys.stderr) |
||||
exit(1) |
||||
if len(sys.argv) == 2 and sys.argv[1] != 'nodl': |
||||
print('Usage:', sys.executable, '-m', __package__, '[nodl]', file=sys.stderr) |
||||
exit(1) |
||||
client.loop.run_until_complete(main(sys.argv[-1] == 'nodl')) |
@ -0,0 +1,26 @@
|
||||
import os |
||||
import json |
||||
import shlex |
||||
import asyncio |
||||
from io import BytesIO |
||||
from . import config, client, seen_videos |
||||
|
||||
async def split_files(filename, destination_dir): |
||||
args = [ |
||||
'split', |
||||
'--verbose', |
||||
'--numeric-suffixes=0', |
||||
'--bytes=2097152000', |
||||
'--suffix-length=2', |
||||
filename, |
||||
os.path.join(destination_dir, os.path.basename(filename)) + '.part' |
||||
] |
||||
proc = await asyncio.create_subprocess_exec(*args, stdout=asyncio.subprocess.PIPE) |
||||
stdout, _ = await proc.communicate() |
||||
return shlex.split(' '.join([i[14:] for i in stdout.decode().strip().split('\n')])) |
||||
|
||||
async def update_seen_videos(): |
||||
with BytesIO(json.dumps(seen_videos).encode()) as file: |
||||
file.name = 'autoytarchive.json' |
||||
file.seek(0) |
||||
await client.edit_message(config['config']['storage_chat_id'], config['config']['storage_message_id'], file=file) |
@ -0,0 +1,186 @@
|
||||
import os |
||||
import sys |
||||
import json |
||||
import time |
||||
import asyncio |
||||
import logging |
||||
import tempfile |
||||
import traceback |
||||
import feedparser |
||||
from io import BytesIO |
||||
from decimal import Decimal |
||||
from urllib.parse import quote as urlencode, urlparse |
||||
from . import session, config, client, seen_videos |
||||
from .utils import split_files, update_seen_videos |
||||
|
||||
tmp_handled = [] |
||||
|
||||
async def check_channels(nodl): |
||||
if nodl: |
||||
await _check_channels(True) |
||||
return |
||||
while True: |
||||
try: |
||||
await _check_channels(False) |
||||
except BaseException: |
||||
logging.exception('Exception encountered with check channels') |
||||
try: |
||||
with BytesIO(traceback.format_exc().encode()) as file: |
||||
file.name = 'check-channels-error.txt' |
||||
file.seek(0) |
||||
await client.send_message(config['config']['storage_chat_id'], 'Exception encountered with check channels', file=file) |
||||
except BaseException: |
||||
logging.exception('Exception encountered when sending message to Telegram about check channels exception') |
||||
await asyncio.sleep(config['config']['wait_seconds']) |
||||
|
||||
check_channels_lock = asyncio.Lock() |
||||
async def _check_channels(nodl): |
||||
async with check_channels_lock: |
||||
for i in config['config']['channels']: |
||||
logging.info('Checking channel %s', i) |
||||
async with session.get(f'https://youtube.com/feeds/videos.xml?channel_id={urlencode(i)}&a={time.time()}') as resp: |
||||
data = feedparser.parse(await resp.text()) |
||||
for j in data['items']: |
||||
if j['yt_videoid'] in seen_videos + tmp_handled: |
||||
continue |
||||
if nodl: |
||||
seen_videos.append(j['yt_videoid']) |
||||
continue |
||||
asyncio.create_task(check_video(j)) |
||||
tmp_handled.append(j['yt_videoid']) |
||||
|
||||
async def check_video(video): |
||||
for _ in range(5): |
||||
try: |
||||
return await _check_video(video) |
||||
except BaseException: |
||||
logging.exception('Exception encountered with checking video %s', video.get('yt_videoid')) |
||||
try: |
||||
with BytesIO(traceback.format_exc().encode()) as file: |
||||
file.name = f'check-videos-error-{video.get("yt_videoid")}.txt' |
||||
file.seek(0) |
||||
await client.send_message(config['config']['storage_chat_id'], f'Exception encountered with checking video {video.get("yt_videoid")}', file=file, parse_mode=None) |
||||
except BaseException: |
||||
logging.exception('Exception encountered when sending message to Telegram about checking video %s exception', video.get('yt_videoid')) |
||||
|
||||
async def _check_video(video): |
||||
logging.info('Checking video %s', video['yt_videoid']) |
||||
first_try_live = waited = False |
||||
while True: |
||||
proc = await asyncio.create_subprocess_exec(sys.executable, 'youtube-dl-injected.py', '--dump-single-json', video['link'], stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) |
||||
stdout, stderr = await proc.communicate() |
||||
if not proc.returncode: |
||||
if not waited: |
||||
first_try_live = True |
||||
break |
||||
wait_time = 30 |
||||
error_message = next(i for i in stderr.decode().split('\n') if i.startswith('ERROR: ')) |
||||
if error_message: |
||||
error_message = error_message[7:] |
||||
if error_message.startswith('AUTOYTARCHIVE:'): |
||||
new_wait_time = int(error_message.split(':', 1)[1].split(' ', 1)[0]) - int(time.time()) |
||||
if new_wait_time > 0: |
||||
wait_time = new_wait_time |
||||
logging.error('Error on video %s: %s', video['yt_videoid'], error_message) |
||||
else: |
||||
logging.error('Video %s returned status code %s\n%s', video['yt_videoid'], proc.returncode, (stderr + stdout).decode()) |
||||
await asyncio.sleep(wait_time) |
||||
waited = True |
||||
video_json = json.loads(stdout) |
||||
if not video_json.get('is_live') and first_try_live: |
||||
first_try_live = False |
||||
video_queue.put_nowait((video_json, time.time(), first_try_live)) |
||||
|
||||
video_queue = asyncio.Queue() |
||||
async def video_worker(): |
||||
while True: |
||||
try: |
||||
await _video_worker() |
||||
except BaseException: |
||||
logging.exception('Exception encountered with video worker') |
||||
try: |
||||
with BytesIO(traceback.format_exc().encode()) as file: |
||||
file.name = 'video-worker-error.txt' |
||||
file.seek(0) |
||||
await client.send_message(config['config']['storage_chat_id'], 'Exception encountered with video worker', file=file) |
||||
except BaseException: |
||||
logging.exception('Exception encountered when sending message to Telegram about video worker exception') |
||||
|
||||
async def _video_worker(): |
||||
while True: |
||||
video_json, start_time, first_try_live = await video_queue.get() |
||||
is_late = first_try_live or (Decimal(time.time()) - Decimal(start_time)) < 5 |
||||
command = ['ffmpeg'] |
||||
tempdir_obj = tempfile.TemporaryDirectory(dir='.') |
||||
try: |
||||
tempdir = tempdir_obj.name |
||||
if video_json.get('requested_formats'): |
||||
for i in video_json['requested_formats']: |
||||
command.extend(('-i', i['url'])) |
||||
else: |
||||
command.extend(('-i', video_json['url'])) |
||||
video_filename = os.path.join(tempdir, video_json['id'] + '.mkv') |
||||
command.extend(('-c', 'copy', video_filename)) |
||||
proc = await asyncio.create_subprocess_exec(*command) |
||||
text = 'New video' |
||||
if is_late: |
||||
text += ' (is late)' |
||||
text += f': {video_json["title"]}' |
||||
with BytesIO(json.dumps(video_json).encode()) as file: |
||||
file.name = video_json['id'] + '.json' |
||||
file.seek(0) |
||||
await client.send_file(config['config']['storage_chat_id'], file, caption=text, parse_mode=None) |
||||
if video_json.get('thumbnail'): |
||||
thumbnail_ext = os.path.splitext(urlparse(video_json['thumbnail']).path)[1] |
||||
thumbnail_filename = os.path.join(tempdir, video_json['id'] + thumbnail_ext) |
||||
async with session.get(video_json['thumbnail']) as resp: |
||||
with open(thumbnail_filename, 'wb') as file: |
||||
while True: |
||||
chunk = await resp.content.read(4096) |
||||
if not chunk: |
||||
break |
||||
file.write(chunk) |
||||
await client.send_file(config['config']['storage_chat_id'], thumbnail_filename, caption=text, parse_mode=None) |
||||
os.remove(thumbnail_filename) |
||||
await proc.communicate() |
||||
except BaseException: |
||||
tempdir_obj.cleanup() |
||||
raise |
||||
upload_queue.put_nowait((tempdir_obj, video_json)) |
||||
video_queue.task_done() |
||||
|
||||
upload_queue = asyncio.Queue() |
||||
async def upload_worker(): |
||||
while True: |
||||
try: |
||||
await _upload_worker() |
||||
except BaseException: |
||||
logging.exception('Exception encountered with upload worker') |
||||
try: |
||||
with BytesIO(traceback.format_exc().encode()) as file: |
||||
file.name = 'upload-worker-error.txt' |
||||
file.seek(0) |
||||
await client.send_message(config['config']['storage_chat_id'], 'Exception encountered with upload worker', file=file) |
||||
except BaseException: |
||||
logging.exception('Exception encountered when sending message to Telegram about upload worker exception') |
||||
|
||||
async def _upload_worker(): |
||||
while True: |
||||
tempdir_obj, video_json = await upload_queue.get() |
||||
try: |
||||
tempdir = tempdir_obj.name |
||||
video_filename = os.path.join(tempdir, video_json['id'] + '.mkv') |
||||
if os.path.getsize(video_filename) > 2000 * 1024 * 1024: |
||||
files = await split_files(video_filename, tempdir) |
||||
os.remove(video_filename) |
||||
else: |
||||
files = [video_filename] |
||||
for i in files: |
||||
message = await client.send_message(config['config']['storage_chat_id'], f'Uploading {os.path.split(i)[1]}...', parse_mode=None) |
||||
await client.send_file(config['config']['storage_chat_id'], i) |
||||
asyncio.create_task(message.delete()) |
||||
finally: |
||||
tempdir_obj.cleanup() |
||||
seen_videos.append(video_json['id']) |
||||
await update_seen_videos() |
||||
upload_queue.task_done() |
@ -0,0 +1,11 @@
|
||||
telegram: |
||||
api_id: 0 |
||||
api_hash: https://my.telegram.org |
||||
bot_token: https://t.me/BotFather |
||||
config: |
||||
storage_chat_id: -1001289824958 |
||||
storage_message_id: 5261 |
||||
wait_seconds: 1800 |
||||
channels: |
||||
- UCL_qhgtOy0dy1Agp8vkySQg |
||||
- UCHsx4Hqa-1ORjQTh9TYDhww |
@ -0,0 +1,6 @@
|
||||
youtube-dl |
||||
feedparser |
||||
telethon |
||||
aiohttp |
||||
cryptg |
||||
pyyaml |
@ -0,0 +1,12 @@
|
||||
from youtube_dl.extractor import youtube |
||||
from youtube_dl import main |
||||
|
||||
youtube._try_get = _try_get = youtube.try_get |
||||
def try_get(src, getter, expected_type=None): |
||||
res = _try_get(src, getter, expected_type) |
||||
if isinstance(res, str) and res.startswith('This live event will begin in '): |
||||
t = _try_get(src, lambda x: x['playabilityStatus']['liveStreamability']['liveStreamabilityRenderer']['offlineSlate']['liveStreamOfflineSlateRenderer']['scheduledStartTime'], str) |
||||
res = f'AUTOYTARCHIVE:{t} {res}' |
||||
return res |
||||
youtube.try_get = try_get |
||||
main() |
Loading…
Reference in new issue