2021-01-13 14:14:13 +00:00
import os
import sys
import json
import time
import asyncio
import logging
import tempfile
import traceback
import feedparser
from io import BytesIO
from decimal import Decimal
from urllib . parse import quote as urlencode , urlparse
from . import session , config , client , seen_videos
from . utils import split_files , update_seen_videos
tmp_handled = [ ]
async def check_channels ( nodl ) :
if nodl :
await _check_channels ( True )
return
while True :
try :
await _check_channels ( False )
except BaseException :
logging . exception ( ' Exception encountered with check channels ' )
try :
with BytesIO ( traceback . format_exc ( ) . encode ( ) ) as file :
file . name = ' check-channels-error.txt '
file . seek ( 0 )
await client . send_message ( config [ ' config ' ] [ ' storage_chat_id ' ] , ' Exception encountered with check channels ' , file = file )
except BaseException :
logging . exception ( ' Exception encountered when sending message to Telegram about check channels exception ' )
await asyncio . sleep ( config [ ' config ' ] [ ' wait_seconds ' ] )
check_channels_lock = asyncio . Lock ( )
async def _check_channels ( nodl ) :
async with check_channels_lock :
for i in config [ ' config ' ] [ ' channels ' ] :
logging . info ( ' Checking channel %s ' , i )
async with session . get ( f ' https://youtube.com/feeds/videos.xml?channel_id= { urlencode ( i ) } &a= { time . time ( ) } ' ) as resp :
data = feedparser . parse ( await resp . text ( ) )
for j in data [ ' items ' ] :
if j [ ' yt_videoid ' ] in seen_videos + tmp_handled :
continue
if nodl :
seen_videos . append ( j [ ' yt_videoid ' ] )
continue
asyncio . create_task ( check_video ( j ) )
tmp_handled . append ( j [ ' yt_videoid ' ] )
async def check_video ( video ) :
for _ in range ( 5 ) :
try :
return await _check_video ( video )
except BaseException :
logging . exception ( ' Exception encountered with checking video %s ' , video . get ( ' yt_videoid ' ) )
try :
with BytesIO ( traceback . format_exc ( ) . encode ( ) ) as file :
file . name = f ' check-videos-error- { video . get ( " yt_videoid " ) } .txt '
file . seek ( 0 )
await client . send_message ( config [ ' config ' ] [ ' storage_chat_id ' ] , f ' Exception encountered with checking video { video . get ( " yt_videoid " ) } ' , file = file , parse_mode = None )
except BaseException :
logging . exception ( ' Exception encountered when sending message to Telegram about checking video %s exception ' , video . get ( ' yt_videoid ' ) )
async def _check_video ( video ) :
logging . info ( ' Checking video %s ' , video [ ' yt_videoid ' ] )
first_try_live = waited = False
while True :
proc = await asyncio . create_subprocess_exec ( sys . executable , ' youtube-dl-injected.py ' , ' --dump-single-json ' , video [ ' link ' ] , stdout = asyncio . subprocess . PIPE , stderr = asyncio . subprocess . PIPE )
stdout , stderr = await proc . communicate ( )
if not proc . returncode :
if not waited :
first_try_live = True
break
wait_time = 30
error_message = next ( i for i in stderr . decode ( ) . split ( ' \n ' ) if i . startswith ( ' ERROR: ' ) )
if error_message :
error_message = error_message [ 7 : ]
if error_message . startswith ( ' AUTOYTARCHIVE: ' ) :
2021-01-13 14:18:37 +00:00
tmp = error_message . split ( ' : ' , 1 ) [ 1 ] . split ( ' ' , 1 ) [ 0 ]
if tmp . isnumeric ( ) :
new_wait_time = int ( tmp ) - int ( time . time ( ) )
if new_wait_time > 0 :
wait_time = new_wait_time
2021-01-13 14:14:13 +00:00
logging . error ( ' Error on video %s : %s ' , video [ ' yt_videoid ' ] , error_message )
else :
logging . error ( ' Video %s returned status code %s \n %s ' , video [ ' yt_videoid ' ] , proc . returncode , ( stderr + stdout ) . decode ( ) )
await asyncio . sleep ( wait_time )
waited = True
video_json = json . loads ( stdout )
if not video_json . get ( ' is_live ' ) and first_try_live :
first_try_live = False
video_queue . put_nowait ( ( video_json , time . time ( ) , first_try_live ) )
video_queue = asyncio . Queue ( )
async def video_worker ( ) :
while True :
try :
await _video_worker ( )
except BaseException :
logging . exception ( ' Exception encountered with video worker ' )
try :
with BytesIO ( traceback . format_exc ( ) . encode ( ) ) as file :
file . name = ' video-worker-error.txt '
file . seek ( 0 )
await client . send_message ( config [ ' config ' ] [ ' storage_chat_id ' ] , ' Exception encountered with video worker ' , file = file )
except BaseException :
logging . exception ( ' Exception encountered when sending message to Telegram about video worker exception ' )
async def _video_worker ( ) :
while True :
video_json , start_time , first_try_live = await video_queue . get ( )
is_late = first_try_live or ( Decimal ( time . time ( ) ) - Decimal ( start_time ) ) < 5
command = [ ' ffmpeg ' ]
tempdir_obj = tempfile . TemporaryDirectory ( dir = ' . ' )
try :
tempdir = tempdir_obj . name
if video_json . get ( ' requested_formats ' ) :
for i in video_json [ ' requested_formats ' ] :
command . extend ( ( ' -i ' , i [ ' url ' ] ) )
else :
command . extend ( ( ' -i ' , video_json [ ' url ' ] ) )
video_filename = os . path . join ( tempdir , video_json [ ' id ' ] + ' .mkv ' )
command . extend ( ( ' -c ' , ' copy ' , video_filename ) )
proc = await asyncio . create_subprocess_exec ( * command )
text = ' New video '
if is_late :
text + = ' (is late) '
text + = f ' : { video_json [ " title " ] } '
with BytesIO ( json . dumps ( video_json ) . encode ( ) ) as file :
file . name = video_json [ ' id ' ] + ' .json '
file . seek ( 0 )
await client . send_file ( config [ ' config ' ] [ ' storage_chat_id ' ] , file , caption = text , parse_mode = None )
if video_json . get ( ' thumbnail ' ) :
thumbnail_ext = os . path . splitext ( urlparse ( video_json [ ' thumbnail ' ] ) . path ) [ 1 ]
thumbnail_filename = os . path . join ( tempdir , video_json [ ' id ' ] + thumbnail_ext )
async with session . get ( video_json [ ' thumbnail ' ] ) as resp :
with open ( thumbnail_filename , ' wb ' ) as file :
while True :
chunk = await resp . content . read ( 4096 )
if not chunk :
break
file . write ( chunk )
await client . send_file ( config [ ' config ' ] [ ' storage_chat_id ' ] , thumbnail_filename , caption = text , parse_mode = None )
os . remove ( thumbnail_filename )
await proc . communicate ( )
except BaseException :
tempdir_obj . cleanup ( )
raise
upload_queue . put_nowait ( ( tempdir_obj , video_json ) )
video_queue . task_done ( )
upload_queue = asyncio . Queue ( )
async def upload_worker ( ) :
while True :
try :
await _upload_worker ( )
except BaseException :
logging . exception ( ' Exception encountered with upload worker ' )
try :
with BytesIO ( traceback . format_exc ( ) . encode ( ) ) as file :
file . name = ' upload-worker-error.txt '
file . seek ( 0 )
await client . send_message ( config [ ' config ' ] [ ' storage_chat_id ' ] , ' Exception encountered with upload worker ' , file = file )
except BaseException :
logging . exception ( ' Exception encountered when sending message to Telegram about upload worker exception ' )
async def _upload_worker ( ) :
while True :
tempdir_obj , video_json = await upload_queue . get ( )
try :
tempdir = tempdir_obj . name
video_filename = os . path . join ( tempdir , video_json [ ' id ' ] + ' .mkv ' )
if os . path . getsize ( video_filename ) > 2000 * 1024 * 1024 :
files = await split_files ( video_filename , tempdir )
os . remove ( video_filename )
else :
files = [ video_filename ]
for i in files :
message = await client . send_message ( config [ ' config ' ] [ ' storage_chat_id ' ] , f ' Uploading { os . path . split ( i ) [ 1 ] } ... ' , parse_mode = None )
await client . send_file ( config [ ' config ' ] [ ' storage_chat_id ' ] , i )
asyncio . create_task ( message . delete ( ) )
finally :
tempdir_obj . cleanup ( )
seen_videos . append ( video_json [ ' id ' ] )
await update_seen_videos ( )
upload_queue . task_done ( )