393 lines
17 KiB
Python
393 lines
17 KiB
Python
|
# lazyleech - Telegram bot primarily to leech from torrents and upload to Telegram
|
||
|
# Copyright (c) 2021 lazyleech developers <theblankx protonmail com, meliodas_bot protonmail com>
|
||
|
#
|
||
|
# This program is free software: you can redistribute it and/or modify
|
||
|
# it under the terms of the GNU Affero General Public License as published
|
||
|
# by the Free Software Foundation, either version 3 of the License, or
|
||
|
# (at your option) any later version.
|
||
|
#
|
||
|
# This program is distributed in the hope that it will be useful,
|
||
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||
|
# GNU Affero General Public License for more details.
|
||
|
#
|
||
|
# You should have received a copy of the GNU Affero General Public License
|
||
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||
|
|
||
|
import os
|
||
|
import time
|
||
|
import html
|
||
|
import asyncio
|
||
|
import tempfile
|
||
|
from urllib.parse import urlparse, urlunparse, unquote as urldecode
|
||
|
from pyrogram import Client, filters
|
||
|
from pyrogram.parser import html as pyrogram_html
|
||
|
from .. import ADMIN_CHATS, ALL_CHATS, PROGRESS_UPDATE_DELAY, session, help_dict, LEECH_TIMEOUT, MAGNET_TIMEOUT, SendAsZipFlag, ForceDocumentFlag
|
||
|
from ..utils.aria2 import aria2_add_torrent, aria2_tell_status, aria2_remove, aria2_add_magnet, Aria2Error, aria2_tell_active, is_gid_owner, aria2_add_directdl
|
||
|
from ..utils.misc import format_bytes, get_file_mimetype, return_progress_string, calculate_eta, allow_admin_cancel
|
||
|
from ..utils.upload_worker import upload_queue, upload_statuses, progress_callback_data, upload_waits, stop_uploads
|
||
|
|
||
|
@Client.on_message(filters.command(['torrent', 'ziptorrent', 'filetorrent']) & filters.chat(ALL_CHATS))
|
||
|
async def torrent_cmd(client, message):
|
||
|
text = (message.text or message.caption).split(None, 1)
|
||
|
command = text.pop(0).lower()
|
||
|
if 'zip' in command:
|
||
|
flags = (SendAsZipFlag,)
|
||
|
elif 'file' in command:
|
||
|
flags = (ForceDocumentFlag,)
|
||
|
else:
|
||
|
flags = ()
|
||
|
link = None
|
||
|
reply = message.reply_to_message
|
||
|
document = message.document
|
||
|
if document:
|
||
|
if document.file_size < 1048576 and document.file_name.endswith('.torrent') and (not document.mime_type or document.mime_type == 'application/x-bittorrent'):
|
||
|
os.makedirs(str(message.from_user.id), exist_ok=True)
|
||
|
fd, link = tempfile.mkstemp(dir=str(message.from_user.id), suffix='.torrent')
|
||
|
os.fdopen(fd).close()
|
||
|
await message.download(link)
|
||
|
mimetype = await get_file_mimetype(link)
|
||
|
if mimetype != 'application/x-bittorrent':
|
||
|
os.remove(link)
|
||
|
link = None
|
||
|
if not link:
|
||
|
if text:
|
||
|
link = text[0].strip()
|
||
|
elif not getattr(reply, 'empty', True):
|
||
|
document = reply.document
|
||
|
link = reply.text
|
||
|
if document:
|
||
|
if document.file_size < 1048576 and document.file_name.endswith('.torrent') and (not document.mime_type or document.mime_type == 'application/x-bittorrent'):
|
||
|
os.makedirs(str(message.from_user.id), exist_ok=True)
|
||
|
fd, link = tempfile.mkstemp(dir=str(message.from_user.id), suffix='.torrent')
|
||
|
os.fdopen(fd).close()
|
||
|
await reply.download(link)
|
||
|
mimetype = await get_file_mimetype(link)
|
||
|
if mimetype != 'application/x-bittorrent':
|
||
|
os.remove(link)
|
||
|
link = reply.text or reply.caption
|
||
|
if not link:
|
||
|
await message.reply_text('''Usage:
|
||
|
- /torrent <i><Torrent URL or File></i>
|
||
|
- /torrent <i>(as reply to a Torrent URL or file)</i>
|
||
|
|
||
|
- /ziptorrent <i><Torrent URL or File></i>
|
||
|
- /ziptorrent <i>(as reply to a Torrent URL or File)</i>
|
||
|
|
||
|
- /filetorrent <i><Torrent URL or File></i> - Sends videos as files
|
||
|
- /filetorrent <i>(as reply to a Torrent URL or file)</i> - Sends videos as files''')
|
||
|
return
|
||
|
await initiate_torrent(client, message, link, flags)
|
||
|
await message.stop_propagation()
|
||
|
|
||
|
async def initiate_torrent(client, message, link, flags):
|
||
|
user_id = message.from_user.id
|
||
|
reply = await message.reply_text('Adding torrent...')
|
||
|
try:
|
||
|
gid = await aria2_add_torrent(session, user_id, link, LEECH_TIMEOUT)
|
||
|
except Aria2Error as ex:
|
||
|
await asyncio.gather(message.reply_text(f'Aria2 Error Occured!\n{ex.error_code}: {html.escape(ex.error_message)}'), reply.delete())
|
||
|
return
|
||
|
finally:
|
||
|
if os.path.isfile(link):
|
||
|
os.remove(link)
|
||
|
await handle_leech(client, message, gid, reply, user_id, flags)
|
||
|
|
||
|
@Client.on_message(filters.command(['magnet', 'zipmagnet', 'filemagnet']) & filters.chat(ALL_CHATS))
|
||
|
async def magnet_cmd(client, message):
|
||
|
text = (message.text or message.caption).split(None, 1)
|
||
|
command = text.pop(0).lower()
|
||
|
if 'zip' in command:
|
||
|
flags = (SendAsZipFlag,)
|
||
|
elif 'file' in command:
|
||
|
flags = (ForceDocumentFlag,)
|
||
|
else:
|
||
|
flags = ()
|
||
|
link = None
|
||
|
reply = message.reply_to_message
|
||
|
if text:
|
||
|
link = text[0].strip()
|
||
|
elif not getattr(reply, 'empty', True):
|
||
|
link = reply.text or reply.caption
|
||
|
if not link:
|
||
|
await message.reply_text('''Usage:
|
||
|
- /magnet <i><Magnet URL></i>
|
||
|
- /magnet <i>(as reply to a Magnet URL)</i>
|
||
|
|
||
|
- /zipmagnet <i><Magnet URL></i>
|
||
|
- /zipmagnet <i>(as reply to a Magnet URL)</i>
|
||
|
|
||
|
- /filemagnet <i><Magnet URL></i> - Sends videos as files
|
||
|
- /filemagnet <i>(as reply to a Magnet URL)</i> - Sends videos as files''')
|
||
|
return
|
||
|
await initiate_magnet(client, message, link, flags)
|
||
|
|
||
|
async def initiate_magnet(client, message, link, flags):
|
||
|
user_id = message.from_user.id
|
||
|
reply = await message.reply_text('Adding magnet...')
|
||
|
try:
|
||
|
gid = await asyncio.wait_for(aria2_add_magnet(session, user_id, link, LEECH_TIMEOUT), MAGNET_TIMEOUT)
|
||
|
except Aria2Error as ex:
|
||
|
await asyncio.gather(message.reply_text(f'Aria2 Error Occured!\n{ex.error_code}: {html.escape(ex.error_message)}'), reply.delete())
|
||
|
except asyncio.TimeoutError:
|
||
|
await asyncio.gather(message.reply_text('Magnet timed out'), reply.delete())
|
||
|
else:
|
||
|
await handle_leech(client, message, gid, reply, user_id, flags)
|
||
|
|
||
|
@Client.on_message(filters.command(['directdl', 'direct', 'zipdirectdl', 'zipdirect', 'filedirectdl', 'filedirect']) & filters.chat(ALL_CHATS))
|
||
|
async def directdl_cmd(client, message):
|
||
|
text = message.text.split(None, 1)
|
||
|
command = text.pop(0).lower()
|
||
|
if 'zip' in command:
|
||
|
flags = (SendAsZipFlag,)
|
||
|
elif 'file' in command:
|
||
|
flags = (ForceDocumentFlag,)
|
||
|
else:
|
||
|
flags = ()
|
||
|
link = filename = None
|
||
|
reply = message.reply_to_message
|
||
|
if text:
|
||
|
link = text[0].strip()
|
||
|
elif not getattr(reply, 'empty', True):
|
||
|
link = reply.text
|
||
|
if not link:
|
||
|
await message.reply_text('''Usage:
|
||
|
- /directdl <i><Direct URL> | optional custom file name</i>
|
||
|
- /directdl <i>(as reply to a Direct URL) | optional custom file name</i>
|
||
|
- /direct <i><Direct URL> | optional custom file name</i>
|
||
|
- /direct <i>(as reply to a Direct URL) | optional custom file name</i>
|
||
|
|
||
|
- /zipdirectdl <i><Direct URL> | optional custom file name</i>
|
||
|
- /zipdirectdl <i>(as reply to a Direct URL) | optional custom file name</i>
|
||
|
- /zipdirect <i><Direct URL> | optional custom file name</i>
|
||
|
- /zipdirect <i>(as reply to a Direct URL) | optional custom file name</i>
|
||
|
|
||
|
- /filedirectdl <i><Direct URL> | optional custom file name</i> - Sends videos as files
|
||
|
- /filedirectdl <i>(as reply to a Direct URL) | optional custom file name</i> - Sends videos as files
|
||
|
- /filedirect <i><Direct URL> | optional custom file name</i> - Sends videos as files
|
||
|
- /filedirect <i>(as reply to a Direct URL) | optional custom file name</i> - Sends videos as files''')
|
||
|
return
|
||
|
split = link.split('|', 1)
|
||
|
if len(split) > 1:
|
||
|
filename = os.path.basename(split[1].strip())
|
||
|
link = split[0].strip()
|
||
|
parsed = list(urlparse(link, 'https'))
|
||
|
if parsed[0] == 'magnet':
|
||
|
if SendAsZipFlag in flags:
|
||
|
prefix = 'zip'
|
||
|
elif ForceDocumentFlag in flags:
|
||
|
prefix = 'file'
|
||
|
else:
|
||
|
prefix = ''
|
||
|
await message.reply_text(f'Use /{prefix}magnet instead')
|
||
|
return
|
||
|
if not parsed[0]:
|
||
|
parsed[0] = 'https'
|
||
|
if parsed[0] not in ('http', 'https'):
|
||
|
await message.reply_text('Invalid scheme')
|
||
|
return
|
||
|
link = urlunparse(parsed)
|
||
|
await initiate_directdl(client, message, link, filename, flags)
|
||
|
|
||
|
async def initiate_directdl(client, message, link, filename, flags):
|
||
|
user_id = message.from_user.id
|
||
|
reply = await message.reply_text('Adding url...')
|
||
|
try:
|
||
|
gid = await asyncio.wait_for(aria2_add_directdl(session, user_id, link, filename, LEECH_TIMEOUT), MAGNET_TIMEOUT)
|
||
|
except Aria2Error as ex:
|
||
|
await asyncio.gather(message.reply_text(f'Aria2 Error Occured!\n{ex.error_code}: {html.escape(ex.error_message)}'), reply.delete())
|
||
|
except asyncio.TimeoutError:
|
||
|
await asyncio.gather(message.reply_text('Connection timed out'), reply.delete())
|
||
|
else:
|
||
|
await handle_leech(client, message, gid, reply, user_id, flags)
|
||
|
|
||
|
leech_statuses = dict()
|
||
|
async def handle_leech(client, message, gid, reply, user_id, flags):
|
||
|
prevtext = None
|
||
|
torrent_info = await aria2_tell_status(session, gid)
|
||
|
last_edit = 0
|
||
|
start_time = time.time()
|
||
|
message_identifier = (reply.chat.id, reply.message_id)
|
||
|
leech_statuses[message_identifier] = gid
|
||
|
download_speed = None
|
||
|
while torrent_info['status'] in ('active', 'waiting', 'paused'):
|
||
|
if torrent_info.get('seeder') == 'true':
|
||
|
break
|
||
|
status = torrent_info['status'].capitalize()
|
||
|
total_length = int(torrent_info['totalLength'])
|
||
|
completed_length = int(torrent_info['completedLength'])
|
||
|
download_speed = format_bytes(torrent_info['downloadSpeed']) + '/s'
|
||
|
if total_length:
|
||
|
formatted_total_length = format_bytes(total_length)
|
||
|
else:
|
||
|
formatted_total_length = 'Unknown'
|
||
|
formatted_completed_length = format_bytes(completed_length)
|
||
|
seeders = torrent_info.get('numSeeders')
|
||
|
peers = torrent_info.get('connections')
|
||
|
if torrent_info.get('bittorrent'):
|
||
|
tor_name = torrent_info['bittorrent']['info']['name']
|
||
|
else:
|
||
|
tor_name = os.path.basename(torrent_info['files'][0]['path'])
|
||
|
if not tor_name:
|
||
|
tor_name = urldecode(os.path.basename(urlparse(torrent_info['files'][0]['uris'][0]['uri']).path))
|
||
|
text = f'''{html.escape(tor_name)}
|
||
|
<code>{html.escape(return_progress_string(completed_length, total_length))}</code>
|
||
|
|
||
|
<b>GID:</b> <code>{gid}</code>
|
||
|
<b>Status:</b> {status}
|
||
|
<b>Total Size:</b> {formatted_total_length}
|
||
|
<b>Downloaded Size:</b> {formatted_completed_length}
|
||
|
<b>Download Speed:</b> {download_speed}
|
||
|
<b>ETA:</b> {calculate_eta(completed_length, total_length, start_time)}'''
|
||
|
if seeders is not None:
|
||
|
text += f'\n<b>Seeders:</b> {seeders}'
|
||
|
if peers is not None:
|
||
|
text += f'\n<b>{"Peers" if seeders is not None else "Connections"}:</b> {peers}'
|
||
|
if (time.time() - last_edit) > PROGRESS_UPDATE_DELAY and text != prevtext:
|
||
|
await reply.edit_text(text)
|
||
|
prevtext = text
|
||
|
last_edit = time.time()
|
||
|
torrent_info = await aria2_tell_status(session, gid)
|
||
|
if torrent_info['status'] == 'error':
|
||
|
error_code = torrent_info['errorCode']
|
||
|
error_message = torrent_info['errorMessage']
|
||
|
text = f'Aria2 Error Occured!\n{error_code}: {html.escape(error_message)}'
|
||
|
if error_code == '7' and not error_message and torrent_info['downloadSpeed'] == '0':
|
||
|
text += '\n\nThis error may have been caused due to the torrent being too slow'
|
||
|
await asyncio.gather(
|
||
|
message.reply_text(text),
|
||
|
reply.delete()
|
||
|
)
|
||
|
elif torrent_info['status'] == 'removed':
|
||
|
await asyncio.gather(
|
||
|
message.reply_text('Your download has been manually cancelled.'),
|
||
|
reply.delete()
|
||
|
)
|
||
|
else:
|
||
|
leech_statuses.pop(message_identifier)
|
||
|
task = None
|
||
|
if upload_queue._unfinished_tasks:
|
||
|
task = asyncio.create_task(reply.edit_text('Download successful, waiting for queue...'))
|
||
|
upload_queue.put_nowait((client, message, reply, torrent_info, user_id, flags))
|
||
|
try:
|
||
|
await aria2_remove(session, gid)
|
||
|
except Aria2Error as ex:
|
||
|
if not (ex.error_code == 1 and ex.error_message == f'Active Download not found for GID#{gid}'):
|
||
|
raise
|
||
|
finally:
|
||
|
if task:
|
||
|
await task
|
||
|
|
||
|
@Client.on_message(filters.command('list') & filters.chat(ALL_CHATS))
|
||
|
async def list_leeches(client, message):
|
||
|
user_id = message.from_user.id
|
||
|
text = ''
|
||
|
quote = None
|
||
|
parser = pyrogram_html.HTML(client)
|
||
|
for i in await aria2_tell_active(session):
|
||
|
if i.get('bittorrent'):
|
||
|
info = i['bittorrent'].get('info')
|
||
|
if not info:
|
||
|
continue
|
||
|
tor_name = info['name']
|
||
|
else:
|
||
|
tor_name = os.path.basename(i['files'][0]['path'])
|
||
|
if not tor_name:
|
||
|
tor_name = urldecode(os.path.basename(urlparse(i['files'][0]['uris'][0]['uri']).path))
|
||
|
a = f'''<b>{html.escape(tor_name)}</b>
|
||
|
<code>{i['gid']}</code>\n\n'''
|
||
|
futtext = text + a
|
||
|
if len((await parser.parse(futtext))['message']) > 4096:
|
||
|
await message.reply_text(text, quote=quote)
|
||
|
quote = False
|
||
|
futtext = a
|
||
|
text = futtext
|
||
|
if not text:
|
||
|
text = 'No leeches found.'
|
||
|
await message.reply_text(text, quote=quote)
|
||
|
|
||
|
@Client.on_message(filters.command('cancel') & filters.chat(ALL_CHATS))
|
||
|
async def cancel_leech(client, message):
|
||
|
user_id = message.from_user.id
|
||
|
gid = None
|
||
|
text = message.text.split(' ', 1)
|
||
|
text.pop(0)
|
||
|
reply = message.reply_to_message
|
||
|
if text:
|
||
|
gid = text[0].strip()
|
||
|
elif not getattr(reply, 'empty', True):
|
||
|
reply_identifier = (reply.chat.id, reply.message_id)
|
||
|
task = upload_statuses.get(reply_identifier)
|
||
|
if task:
|
||
|
task, starter_id = task
|
||
|
if user_id != starter_id and not await allow_admin_cancel(message.chat.id, user_id):
|
||
|
await message.reply_text('You did not start this leech.')
|
||
|
else:
|
||
|
task.cancel()
|
||
|
return
|
||
|
result = progress_callback_data.get(reply_identifier)
|
||
|
if result:
|
||
|
if user_id != result[3] and not await allow_admin_cancel(message.chat.id, user_id):
|
||
|
await message.reply_text('You did not start this leech.')
|
||
|
else:
|
||
|
stop_uploads.add(reply_identifier)
|
||
|
await message.reply_text('Cancelled!')
|
||
|
return
|
||
|
starter_id = upload_waits.get(reply_identifier)
|
||
|
if starter_id:
|
||
|
if user_id != starter_id[0] and not await allow_admin_cancel(message.chat.id, user_id):
|
||
|
await message.reply_text('You did not start this leech.')
|
||
|
else:
|
||
|
stop_uploads.add(reply_identifier)
|
||
|
await message.reply_text('Cancelled!')
|
||
|
return
|
||
|
gid = leech_statuses.get(reply_identifier)
|
||
|
if not gid:
|
||
|
await message.reply_text('''Usage:
|
||
|
/cancel <i><GID></i>
|
||
|
/cancel <i>(as reply to status message)</i>''')
|
||
|
return
|
||
|
if not is_gid_owner(user_id, gid) and not await allow_admin_cancel(message.chat.id, user_id):
|
||
|
await message.reply_text('You did not start this leech.')
|
||
|
return
|
||
|
await aria2_remove(session, gid)
|
||
|
|
||
|
help_dict['leech'] = ('Leech',
|
||
|
'''/torrent <i><Torrent URL or File></i>
|
||
|
/torrent <i>(as reply to a Torrent URL or file)</i>
|
||
|
|
||
|
/ziptorrent <i><Torrent URL or File></i>
|
||
|
/ziptorrent <i>(as reply to a Torrent URL or File)</i>
|
||
|
|
||
|
/filetorrent <i><Torrent URL or File></i> - Sends videos as files
|
||
|
/filetorrent <i>(as reply to a Torrent URL or File)</i> - Sends videos as files
|
||
|
|
||
|
/magnet <i><Magnet URL></i>
|
||
|
/magnet <i>(as reply to a Magnet URL)</i>
|
||
|
|
||
|
/zipmagnet <i><Magnet URL></i>
|
||
|
/zipmagnet <i>(as reply to a Magnet URL)</i>
|
||
|
|
||
|
/filemagnet <i><Magnet URL></i> - Sends videos as files
|
||
|
/filemagnet <i>(as reply to a Magnet URL)</i> - Sends videos as files
|
||
|
|
||
|
/directdl <i><Direct URL> | optional custom file name</i>
|
||
|
/directdl <i>(as reply to a Direct URL) | optional custom file name</i>
|
||
|
/direct <i><Direct URL> | optional custom file name</i>
|
||
|
/direct <i>(as reply to a Direct URL) | optional custom file name</i>
|
||
|
|
||
|
/zipdirectdl <i><Direct URL> | optional custom file name</i>
|
||
|
/zipdirectdl <i>(as reply to a Direct URL) | optional custom file name</i>
|
||
|
/zipdirect <i><Direct URL> | optional custom file name</i>
|
||
|
/zipdirect <i>(as reply to a Direct URL) | optional custom file name</i>
|
||
|
|
||
|
/filedirectdl <i><Direct URL> | optional custom file name</i> - Sends videos as files
|
||
|
/filedirectdl <i>(as reply to a Direct URL) | optional custom file name</i> - Sends videos as files
|
||
|
/filedirect <i><Direct URL> | optional custom file name</i> - Sends videos as files
|
||
|
/filedirect <i>(as reply to a Direct URL) | optional custom file name</i> - Sends videos as files
|
||
|
|
||
|
/cancel <i><GID></i>
|
||
|
/cancel <i>(as reply to status message)</i>
|
||
|
|
||
|
/list - Lists all current leeches''')
|