2021-04-07 07:21:34 +00:00
import re
2021-01-13 14:14:13 +00:00
import os
import json
import time
import asyncio
import logging
import tempfile
import traceback
import feedparser
from io import BytesIO
from decimal import Decimal
from urllib . parse import quote as urlencode , urlparse
2021-04-07 07:21:34 +00:00
from youtube_dl import YoutubeDL
2021-01-13 14:14:13 +00:00
from . import session , config , client , seen_videos
2021-04-07 08:43:42 +00:00
from . utils import update_seen_videos
from . cappedio import bopen
2021-01-13 14:14:13 +00:00
tmp_handled = [ ]
2021-04-07 08:43:42 +00:00
size_limit = 2000 * 1024 * 1024
2021-04-07 07:21:34 +00:00
live_regex = re . compile ( r ' error: (?:autoytarchive:([0-9]+) )?(?:this live event will begin|premieres) in .+ ' , re . I )
strip_date = re . compile ( r ' \ d {4} - \ d {2} - \ d {2} \ d {2} : \ d {2} $ ' )
ytdl = YoutubeDL ( { ' skip_download ' : True , ' no_color ' : True } )
ytdl . add_default_info_extractors ( )
2021-01-13 14:14:13 +00:00
async def check_channels ( nodl ) :
if nodl :
await _check_channels ( True )
return
while True :
try :
await _check_channels ( False )
except BaseException :
logging . exception ( ' Exception encountered with check channels ' )
try :
with BytesIO ( traceback . format_exc ( ) . encode ( ) ) as file :
file . name = ' check-channels-error.txt '
file . seek ( 0 )
await client . send_message ( config [ ' config ' ] [ ' storage_chat_id ' ] , ' Exception encountered with check channels ' , file = file )
except BaseException :
logging . exception ( ' Exception encountered when sending message to Telegram about check channels exception ' )
await asyncio . sleep ( config [ ' config ' ] [ ' wait_seconds ' ] )
check_channels_lock = asyncio . Lock ( )
async def _check_channels ( nodl ) :
async with check_channels_lock :
for i in config [ ' config ' ] [ ' channels ' ] :
logging . info ( ' Checking channel %s ' , i )
async with session . get ( f ' https://youtube.com/feeds/videos.xml?channel_id= { urlencode ( i ) } &a= { time . time ( ) } ' ) as resp :
data = feedparser . parse ( await resp . text ( ) )
for j in data [ ' items ' ] :
if j [ ' yt_videoid ' ] in seen_videos + tmp_handled :
continue
if nodl :
seen_videos . append ( j [ ' yt_videoid ' ] )
continue
asyncio . create_task ( check_video ( j ) )
tmp_handled . append ( j [ ' yt_videoid ' ] )
async def check_video ( video ) :
for _ in range ( 5 ) :
try :
return await _check_video ( video )
except BaseException :
logging . exception ( ' Exception encountered with checking video %s ' , video . get ( ' yt_videoid ' ) )
try :
with BytesIO ( traceback . format_exc ( ) . encode ( ) ) as file :
file . name = f ' check-videos-error- { video . get ( " yt_videoid " ) } .txt '
file . seek ( 0 )
await client . send_message ( config [ ' config ' ] [ ' storage_chat_id ' ] , f ' Exception encountered with checking video { video . get ( " yt_videoid " ) } ' , file = file , parse_mode = None )
except BaseException :
logging . exception ( ' Exception encountered when sending message to Telegram about checking video %s exception ' , video . get ( ' yt_videoid ' ) )
async def _check_video ( video ) :
logging . info ( ' Checking video %s ' , video [ ' yt_videoid ' ] )
first_try_live = waited = False
2021-04-07 07:21:34 +00:00
too_many_requests_count = 1
2021-01-13 14:14:13 +00:00
while True :
2021-04-07 07:21:34 +00:00
try :
video_json = await client . loop . run_in_executor ( None , ytdl . extract_info , video [ ' link ' ] )
except BaseException as e :
wait_time = 30
message = str ( e )
if ' 429 ' in message or ' too many ' in message . lower ( ) :
wait_time = too_many_requests_count * 60 * 60
too_many_requests_count + = 1
elif match := live_regex . match ( message . rstrip ( ' . ' ) ) :
end_schedule_time = match . group ( 1 ) or 0
if end_schedule_time := int ( end_schedule_time ) :
tmp_wait_time = end_schedule_time - time . time ( )
if tmp_wait_time > wait_time :
wait_time = tmp_wait_time
await asyncio . sleep ( wait_time )
waited = True
else :
2021-01-14 16:18:55 +00:00
if not waited :
2021-01-13 14:14:13 +00:00
first_try_live = True
break
2021-01-20 08:26:56 +00:00
if not video_json . get ( ' is_live ' ) :
2021-01-14 16:18:55 +00:00
first_try_live = False
2021-01-13 14:14:13 +00:00
video_queue . put_nowait ( ( video_json , time . time ( ) , first_try_live ) )
video_queue = asyncio . Queue ( )
async def video_worker ( ) :
while True :
try :
await _video_worker ( )
except BaseException :
logging . exception ( ' Exception encountered with video worker ' )
try :
with BytesIO ( traceback . format_exc ( ) . encode ( ) ) as file :
file . name = ' video-worker-error.txt '
file . seek ( 0 )
await client . send_message ( config [ ' config ' ] [ ' storage_chat_id ' ] , ' Exception encountered with video worker ' , file = file )
except BaseException :
logging . exception ( ' Exception encountered when sending message to Telegram about video worker exception ' )
async def _video_worker ( ) :
while True :
video_json , start_time , first_try_live = await video_queue . get ( )
2021-01-20 08:26:56 +00:00
late_to_queue = ( Decimal ( time . time ( ) ) - Decimal ( start_time ) ) > 5
is_late = first_try_live or late_to_queue
command = [ ' ffmpeg ' , ' -y ' ]
2021-01-13 14:14:13 +00:00
tempdir_obj = tempfile . TemporaryDirectory ( dir = ' . ' )
try :
tempdir = tempdir_obj . name
2021-01-20 08:26:56 +00:00
if late_to_queue :
2021-01-21 17:09:52 +00:00
for i in range ( 5 ) :
wait_time = 30
2021-04-07 07:21:34 +00:00
try :
tmp = await client . loop . run_in_executor ( None , ytdl . extract_info , video_json [ ' id ' ] )
except BaseException as e :
e = str ( e )
if ' 429 ' in e or ' too many request ' in e . lower ( ) :
2021-01-21 17:09:52 +00:00
wait_time = ( i + 1 ) * 60 * 60
2021-01-20 08:26:56 +00:00
else :
2021-04-07 07:21:34 +00:00
video_json = tmp
break
2021-01-21 17:09:52 +00:00
await asyncio . sleep ( wait_time )
2021-01-13 14:14:13 +00:00
if video_json . get ( ' requested_formats ' ) :
for i in video_json [ ' requested_formats ' ] :
command . extend ( ( ' -i ' , i [ ' url ' ] ) )
else :
command . extend ( ( ' -i ' , video_json [ ' url ' ] ) )
video_filename = os . path . join ( tempdir , video_json [ ' id ' ] + ' .mkv ' )
command . extend ( ( ' -c ' , ' copy ' , video_filename ) )
proc = await asyncio . create_subprocess_exec ( * command )
text = ' New video '
if is_late :
text + = ' (is late) '
2021-04-07 09:22:38 +00:00
video_json [ ' title ' ] = strip_date . sub ( ' ' , video_json [ ' title ' ] ) . strip ( )
2021-01-15 06:41:54 +00:00
text + = f ' : { video_json [ " title " ] } \n https://youtube.com/watch?v= { video_json [ " id " ] } '
2021-01-14 05:15:07 +00:00
with BytesIO ( json . dumps ( video_json , indent = 4 ) . encode ( ) ) as file :
2021-01-13 14:14:13 +00:00
file . name = video_json [ ' id ' ] + ' .json '
file . seek ( 0 )
await client . send_file ( config [ ' config ' ] [ ' storage_chat_id ' ] , file , caption = text , parse_mode = None )
if video_json . get ( ' thumbnail ' ) :
thumbnail_ext = os . path . splitext ( urlparse ( video_json [ ' thumbnail ' ] ) . path ) [ 1 ]
thumbnail_filename = os . path . join ( tempdir , video_json [ ' id ' ] + thumbnail_ext )
async with session . get ( video_json [ ' thumbnail ' ] ) as resp :
with open ( thumbnail_filename , ' wb ' ) as file :
while True :
chunk = await resp . content . read ( 4096 )
if not chunk :
break
file . write ( chunk )
await client . send_file ( config [ ' config ' ] [ ' storage_chat_id ' ] , thumbnail_filename , caption = text , parse_mode = None )
os . remove ( thumbnail_filename )
2021-01-20 08:26:56 +00:00
for _ in range ( 50 ) :
await proc . communicate ( )
if not proc . returncode :
break
wait_time = 30
if video_json . get ( ' duration ' ) :
is_manifest = False
if video_json . get ( ' url ' ) :
is_manifest = urlparse ( video_json [ ' url ' ] ) . netloc == ' manifest.googlevideo.com '
if not is_manifest and video_json . get ( ' requested_formats ' ) :
for i in video_json [ ' requested_formats ' ] :
if urlparse ( i [ ' url ' ] ) . netloc == ' manifest.googlevideo.com ' :
is_manifest = True
break
if is_manifest :
2021-04-10 05:50:29 +00:00
await asyncio . sleep ( video_json [ ' duration ' ] )
2021-01-20 08:26:56 +00:00
try :
await client . send_message ( config [ ' config ' ] [ ' storage_chat_id ' ] , f ' Failed to download video { video_json [ " id " ] } , please check logs ' , parse_mode = None )
except BaseException :
logging . exception ( ' Exception encountered when sending message to Telegram about download failure exception ' )
2021-01-21 17:09:52 +00:00
for i in range ( 5 ) :
2021-04-07 07:21:34 +00:00
try :
tmp = await client . loop . run_in_executor ( None , ytdl . extract_info , video_json [ ' id ' ] )
except BaseException as e :
e = str ( e )
if ' 429 ' in e or ' too many request ' in e . lower ( ) :
2021-01-21 17:09:52 +00:00
wait_time = ( i + 1 ) * 60 * 60
2021-01-20 08:26:56 +00:00
else :
2021-04-07 07:21:34 +00:00
video_json = tmp
break
2021-01-20 08:26:56 +00:00
await asyncio . sleep ( wait_time )
2021-01-13 14:14:13 +00:00
except BaseException :
tempdir_obj . cleanup ( )
raise
upload_queue . put_nowait ( ( tempdir_obj , video_json ) )
video_queue . task_done ( )
upload_queue = asyncio . Queue ( )
async def upload_worker ( ) :
while True :
try :
await _upload_worker ( )
except BaseException :
logging . exception ( ' Exception encountered with upload worker ' )
try :
with BytesIO ( traceback . format_exc ( ) . encode ( ) ) as file :
file . name = ' upload-worker-error.txt '
file . seek ( 0 )
await client . send_message ( config [ ' config ' ] [ ' storage_chat_id ' ] , ' Exception encountered with upload worker ' , file = file )
except BaseException :
logging . exception ( ' Exception encountered when sending message to Telegram about upload worker exception ' )
async def _upload_worker ( ) :
while True :
tempdir_obj , video_json = await upload_queue . get ( )
try :
tempdir = tempdir_obj . name
2021-04-07 08:43:42 +00:00
base_filename = video_json [ ' id ' ] + ' .mkv '
video_filename = os . path . join ( tempdir , base_filename )
files_sent = size_sent = 0
messages = [ ]
2021-04-09 03:45:14 +00:00
file = bopen ( video_filename , None )
2021-04-09 14:34:48 +00:00
file . seek ( 0 , os . SEEK_END )
total_size = file . tell ( )
is_big = total_size > size_limit
2021-04-07 08:43:42 +00:00
while total_size > 0 :
2021-04-07 17:43:00 +00:00
file . seek ( size_sent )
2021-04-09 03:45:14 +00:00
file . capped_size = size_limit
2021-04-07 08:43:42 +00:00
if is_big :
file . name = f ' { base_filename } .part { str ( files_sent ) . rjust ( 2 , " 0 " ) } '
2021-04-09 03:45:14 +00:00
else :
file . name = base_filename
2021-04-07 08:43:42 +00:00
messages . append ( ( await client . send_message ( config [ ' config ' ] [ ' storage_chat_id ' ] , f ' Uploading { file . name } ... ' , parse_mode = None ) ) . id )
2021-04-09 14:34:48 +00:00
message = await client . send_file ( config [ ' config ' ] [ ' storage_chat_id ' ] , file , caption = file . name , parse_mode = None , file_size = size_limit if total_size > size_limit else total_size )
2021-04-07 16:41:14 +00:00
total_size - = message . document . size
2021-04-07 08:43:42 +00:00
if total_size > 0 :
2021-04-07 16:41:14 +00:00
size_sent + = message . document . size
2021-04-07 08:43:42 +00:00
files_sent + = 1
2021-04-09 14:34:48 +00:00
file . capped_size = None
file . close ( )
2021-04-07 08:43:42 +00:00
if messages :
await client . delete_messages ( config [ ' config ' ] [ ' storage_chat_id ' ] , messages )
2021-01-13 14:14:13 +00:00
finally :
tempdir_obj . cleanup ( )
seen_videos . append ( video_json [ ' id ' ] )
await update_seen_videos ( )
upload_queue . task_done ( )