import logging logging.basicConfig(level=logging.INFO) import os import re import time import html import yaml import asyncio import traceback from datetime import datetime from telethon import TelegramClient, events from telethon.utils import get_display_name from telethon.errors.rpcerrorlist import UserIsBlockedError from telethon.tl.types import InputPeerUser, MessageMediaWebPage, MessageMediaUnsupported config = yaml.safe_load(open('config.yaml')) api_id = config['telegram']['api_id'] api_hash = config['telegram']['api_hash'] bot_token = config['telegram'].get('bot_token') receive_chat = config['receive_chat'] storage_chat_id = storage_message_id = None if config.get('storage'): storage_chat_id = config['storage'].get('storage_chat_id') storage_message_id = config['storage'].get('storage_message_id') hidden_fwd_regex = re.compile(r'^Hidden User: .+ \(([0-9]+),(-?[0-9]+)\)$') response_data = dict() banned_data = dict() flood_data = dict() def is_flood(user_id): if user_id not in flood_data: flood_data[user_id] = [] uflood_data = flood_data[user_id] current_time = int(time.time()) uflood_data.append(current_time) flood_data[user_id] = uflood_data = list(filter(lambda i: current_time - i < 3, uflood_data)) return len(uflood_data) > 3 banned_data_lock = asyncio.Lock() async def save_banned_data(): async with banned_data_lock: with open('rias.data', 'w') as file: file.write('0\n') for (user_id, (end_date, reason)) in banned_data.items(): if not end_date: end_date = '' if not reason: reason = '' file.write(f'{user_id},{end_date},{reason.encode("unicode_escape").decode()}\n') if storage_chat_id and storage_message_id: try: await client.edit_message(storage_chat_id, storage_message_id, file='rias.data') except BaseException: logging.exception('Error while uploading rias data') async def ban_user(user, end_date, reason): user_id = user.user_id async with banned_data_lock: banned_data[user_id] = (end_date, reason) await save_banned_data() if end_date: text = f'You have been banned until {datetime.fromtimestamp(end_date)}' else: text = 'You have been banned indefinitely' if reason: text += f' for {reason}' try: await client.send_message(user, text) except BaseException: logging.exception('Exception occured while informing %s of their ban', user) client = TelegramClient('rias', api_id, api_hash) client.parse_mode = 'html' async def main(): await client.start(bot_token=bot_token) me = await client.get_me() tban_regex = re.compile(f'(?i)/tban(?:@{me.username})?\\s+([0-9]+[a-z])(?:\\s+([\\s\\S]+))?') if storage_chat_id and storage_message_id: try: m = await client.get_messages(storage_chat_id, ids=storage_message_id) await m.download_media('rias.data') except BaseException: logging.exception('Error while downloading rias data') if os.path.exists('rias.data'): save_it = False try: with open('rias.data', 'r') as file: version = file.readline().strip() if version != '0': logging.error('Unsupported rias data version %s', version) else: while True: data = file.readline().strip() if not data: break user_id, end_date, reason = data.split(',', 2) if end_date and int(time.time()) > int(end_date): save_it = True continue banned_data[int(user_id)] = (int(end_date) if end_date else None, reason.encode().decode('unicode_escape')) except BaseException: logging.exception('Error while parsing rias data') if save_it: await save_banned_data() @client.on(events.NewMessage) async def flood_handler(e): if e.out or (e.chat_id != await client.get_peer_id(receive_chat) and is_flood(e.chat_id)): raise events.StopPropagation @client.on(events.NewMessage(pattern=f'(?i)/start(?:@{me.username})?(?:\\s|$)', func=lambda e: e.is_private)) async def start(e): await e.reply('Welcome to blankie contact bot!\n\nSource: https://gitlab.com/blankX/rias or https://git.nixnet.services/blankie/rias', link_preview=False) raise events.StopPropagation @client.on(events.NewMessage(func=lambda e: e.is_private)) async def to_receive_handler(e): if data := banned_data.get(e.chat_id): end_date, reason = data if end_date and int(time.time()) > end_date: async with banned_data_lock: banned_data.pop(e.chat_id) await save_banned_data() else: if end_date: text = f'You have been banned until {datetime.fromtimestamp(end_date)}' else: text = 'You have been banned indefinitely' if reason: text += f' for {reason}' await e.reply(text) return fwd = await e.forward_to(receive_chat) if not fwd.forward or fwd.forward.sender_id != e.chat_id: user = await e.get_chat() await fwd.reply(f'Hidden User: {html.escape(get_display_name(user))} ({e.chat_id},{user.access_hash})', link_preview=False) @client.on(events.MessageEdited(func=lambda e: e.is_private)) async def no_edits_handler(e): if not is_flood(e.chat_id): await e.reply('Edits are currently not supported, sorry. Please resend your message') @client.on(events.NewMessage(receive_chat, func=lambda e: e.mentioned and e.reply_to_msg_id)) async def response_handler(e): reply = await e.get_reply_message() if not reply or reply.sender_id != me.id: return if reply.forward and reply.forward.sender_id: user = await reply.forward.get_input_sender() elif (match := hidden_fwd_regex.match(reply.raw_text)) and not reply.forward and not reply.media: user = InputPeerUser(int(match.group(1)), int(match.group(2))) else: return if e.text.lower().startswith('/ban'): ban_reason = e.text[4:].strip() end_date = None await ban_user(user, end_date, ban_reason) await e.reply('Banned') return if e.text.lower().startswith('/tban'): if match := tban_regex.match(e.text): end_date = match.group(1) duration = end_date[-1].lower() if duration == 'm': duration = 60 elif duration == 'd': duration = 60 * 60 elif duration == 'd': duration = 60 * 60 * 24 elif duration == 'w': duration = 60 * 60 * 24 * 7 else: await e.reply(f'Invalid duration {duration}, please use m, h, d or w', parse_mode=None) return end_date = int(time.time() + (int(end_date[:-1]) or 1) * duration) ban_reason = match.group(2) await ban_user(user, end_date, ban_reason) await e.reply('Banned') return await e.reply('Regex failed to match') return if e.text.lower().startswith('/unban'): if user.user_id not in banned_data: await e.reply('Never banned') return async with banned_data_lock: banned_data.pop(user.user_id) await save_banned_data() text = 'You have been unbanned' if unban_reason := e.text[6:].strip(): text += f' for {unban_reason}' try: await client.send_message(user, text) except BaseException: logging.exception('Exception occured while informing %s of their unban', user) await e.reply('Unbanned') return try: response_data[e.id] = (user, (await client.send_message(user, e.message)).id) except UserIsBlockedError: await e.reply('The bot was blocked by the user') except BaseException: await e.reply(traceback.format_exc(), parse_mode=None) raise @client.on(events.MessageEdited(receive_chat, func=lambda e: e.id in response_data)) async def response_edit_handler(e): user, message_id = response_data[e.id] try: await client.edit_message(user, message_id, e.text, file=None if isinstance(e.media, (MessageMediaWebPage, MessageMediaUnsupported)) else e.media, link_preview=isinstance(e.media, MessageMediaWebPage)) except UserIsBlockedError: await e.reply('The bot was blocked by the user') except BaseException: await e.reply(traceback.format_exc(), parse_mode=None) raise await client.run_until_disconnected() if __name__ == '__main__': client.loop.run_until_complete(main())