import os import html from pyrogram import Client, filters from pyrogram.errors.exceptions.bad_request_400 import MessageIdInvalid from .. import config, help_dict, log_errors, session, progress_callback, public_log_errors @Client.on_message(~filters.sticker & ~filters.via_bot & ~filters.edited & filters.outgoing & filters.command(['ls', 'hls', 'hiddenls'], prefixes=config['config']['prefixes'])) @log_errors @public_log_errors async def ls(client, message): dir = os.path.abspath(os.path.expanduser(' '.join(message.command[1:]) or '.')) text = '' folders = [] files = [] try: for i in sorted(os.listdir(dir)): if i.startswith('.') and 'h' not in message.command[0]: continue (folders if os.path.isdir(os.path.join(dir, i)) else files).append(i) except NotADirectoryError: text = f'{html.escape(os.path.basename(dir))}' except Exception as ex: text = f'{type(ex).__name__}: {html.escape(str(ex))}' else: for i in folders: text += f'{html.escape(i)}\n' for i in files: text += f'{html.escape(i)}\n' await message.reply_text(text or 'Empty', disable_web_page_preview=True) @Client.on_message(~filters.sticker & ~filters.via_bot & ~filters.edited & filters.outgoing & filters.command(['ul', 'upload'], prefixes=config['config']['prefixes'])) @log_errors @public_log_errors async def upload(client, message): file = os.path.expanduser(' '.join(message.command[1:])) if not file: return text = f'Uploading {html.escape(file)}...' reply = await message.reply_text(text) try: await client.send_document(message.chat.id, file, progress=progress_callback, progress_args=(reply, text, True), force_document=True, reply_to_message_id=None if message.chat.type in ('private', 'bot') else message.message_id) except MessageIdInvalid: await message.reply_text('Upload cancelled!') else: await reply.delete() @Client.on_message(~filters.sticker & ~filters.via_bot & ~filters.edited & filters.outgoing & filters.command(['dl', 'download'], prefixes=config['config']['prefixes'])) @log_errors @public_log_errors async def download(client, message): file = os.path.expanduser(' '.join(message.command[1:]) or './') if os.path.isdir(file): file = os.path.join(file, '') available_media = ("audio", "document", "photo", "sticker", "animation", "video", "voice", "video_note") download_message = None for i in available_media: if getattr(message, i, None): download_message = message break else: reply = message.reply_to_message if not getattr(reply, 'empty', True): for i in available_media: if getattr(reply, i, None): download_message = reply break if download_message is None: await message.reply_text('Media required') return text = 'Downloading...' reply = await message.reply_text(text) try: file = await download_message.download(file, progress=progress_callback, progress_args=(reply, text, False)) except MessageIdInvalid: await message.reply_text('Download cancelled!') else: await reply.edit_text(f'Downloaded to {html.escape(file)}') help_dict['files'] = ('Files', '''{prefix}ls [directory] - Lists files in [directory] {prefix}hiddenls [directory] - {prefix}ls but shows hidden files Aliases: {prefix}hls {prefix}upload <file name> - Uploads <file name> Aliases: {prefix}ul {prefix}download [file name] (as reply or caption to a file) - Downloads file to [file name] Aliases: {prefix}dl''')