219 lines
		
	
	
		
			9.3 KiB
		
	
	
	
		
			Python
		
	
	
	
			
		
		
	
	
			219 lines
		
	
	
		
			9.3 KiB
		
	
	
	
		
			Python
		
	
	
	
import logging
 | 
						|
logging.basicConfig(level=logging.INFO)
 | 
						|
 | 
						|
import os
 | 
						|
import re
 | 
						|
import time
 | 
						|
import html
 | 
						|
import yaml
 | 
						|
import asyncio
 | 
						|
import traceback
 | 
						|
from datetime import datetime
 | 
						|
from telethon import TelegramClient, events
 | 
						|
from telethon.utils import get_display_name
 | 
						|
from telethon.errors.rpcerrorlist import UserIsBlockedError
 | 
						|
from telethon.tl.types import InputPeerUser, MessageMediaWebPage, MessageMediaUnsupported
 | 
						|
 | 
						|
config = yaml.safe_load(open('config.yaml'))
 | 
						|
api_id = config['telegram']['api_id']
 | 
						|
api_hash = config['telegram']['api_hash']
 | 
						|
bot_token = config['telegram'].get('bot_token')
 | 
						|
receive_chat = config['receive_chat']
 | 
						|
storage_chat_id = storage_message_id = None
 | 
						|
if config.get('storage'):
 | 
						|
    storage_chat_id = config['storage'].get('storage_chat_id')
 | 
						|
    storage_message_id = config['storage'].get('storage_message_id')
 | 
						|
hidden_fwd_regex = re.compile(r'^Hidden User: .+ \(([0-9]+),(-?[0-9]+)\)$')
 | 
						|
response_data = dict()
 | 
						|
banned_data = dict()
 | 
						|
flood_data = dict()
 | 
						|
def is_flood(user_id):
 | 
						|
    if user_id not in flood_data:
 | 
						|
        flood_data[user_id] = []
 | 
						|
    uflood_data = flood_data[user_id]
 | 
						|
    current_time = int(time.time())
 | 
						|
    uflood_data.append(current_time)
 | 
						|
    flood_data[user_id] = uflood_data = list(filter(lambda i: current_time - i < 3, uflood_data))
 | 
						|
    return len(uflood_data) > 3
 | 
						|
banned_data_lock = asyncio.Lock()
 | 
						|
async def save_banned_data():
 | 
						|
    async with banned_data_lock:
 | 
						|
        with open('rias.data', 'w') as file:
 | 
						|
            file.write('0\n')
 | 
						|
            for (user_id, (end_date, reason)) in banned_data.items():
 | 
						|
                if not end_date:
 | 
						|
                    end_date = ''
 | 
						|
                if not reason:
 | 
						|
                    reason = ''
 | 
						|
                file.write(f'{user_id},{end_date},{reason.encode("unicode_escape").decode()}\n')
 | 
						|
        if storage_chat_id and storage_message_id:
 | 
						|
            try:
 | 
						|
                await client.edit_message(storage_chat_id, storage_message_id, file='rias.data')
 | 
						|
            except BaseException:
 | 
						|
                logging.exception('Error while uploading rias data')
 | 
						|
async def ban_user(user, end_date, reason):
 | 
						|
    user_id = user.user_id
 | 
						|
    async with banned_data_lock:
 | 
						|
        banned_data[user_id] = (end_date, reason)
 | 
						|
    await save_banned_data()
 | 
						|
    if end_date:
 | 
						|
        text = f'You have been banned until {datetime.fromtimestamp(end_date)}'
 | 
						|
    else:
 | 
						|
        text = 'You have been banned indefinitely'
 | 
						|
    if reason:
 | 
						|
        text += f' for {reason}'
 | 
						|
    try:
 | 
						|
        await client.send_message(user, text)
 | 
						|
    except BaseException:
 | 
						|
        logging.exception('Exception occured while informing %s of their ban', user)
 | 
						|
 | 
						|
client = TelegramClient('rias', api_id, api_hash)
 | 
						|
client.parse_mode = 'html'
 | 
						|
async def main():
 | 
						|
    await client.start(bot_token=bot_token)
 | 
						|
    me = await client.get_me()
 | 
						|
    tban_regex = re.compile(f'(?i)/tban(?:@{me.username})?\\s+([0-9]+[a-z])(?:\\s+([\\s\\S]+))?')
 | 
						|
    if storage_chat_id and storage_message_id:
 | 
						|
        try:
 | 
						|
            m = await client.get_messages(storage_chat_id, ids=storage_message_id)
 | 
						|
            await m.download_media('rias.data')
 | 
						|
        except BaseException:
 | 
						|
            logging.exception('Error while downloading rias data')
 | 
						|
    if os.path.exists('rias.data'):
 | 
						|
        save_it = False
 | 
						|
        try:
 | 
						|
            with open('rias.data', 'r') as file:
 | 
						|
                version = file.readline().strip()
 | 
						|
                if version != '0':
 | 
						|
                    logging.error('Unsupported rias data version %s', version)
 | 
						|
                else:
 | 
						|
                    while True:
 | 
						|
                        data = file.readline().strip()
 | 
						|
                        if not data:
 | 
						|
                            break
 | 
						|
                        user_id, end_date, reason = data.split(',', 2)
 | 
						|
                        if end_date and int(time.time()) > int(end_date):
 | 
						|
                            save_it = True
 | 
						|
                            continue
 | 
						|
                        banned_data[int(user_id)] = (int(end_date) if end_date else None, reason.encode().decode('unicode_escape'))
 | 
						|
        except BaseException:
 | 
						|
            logging.exception('Error while parsing rias data')
 | 
						|
        if save_it:
 | 
						|
            await save_banned_data()
 | 
						|
 | 
						|
    @client.on(events.NewMessage)
 | 
						|
    async def flood_handler(e):
 | 
						|
        if e.out or (e.chat_id != await client.get_peer_id(receive_chat) and is_flood(e.chat_id)):
 | 
						|
            raise events.StopPropagation
 | 
						|
 | 
						|
    @client.on(events.NewMessage(pattern=f'(?i)/start(?:@{me.username})?(?:\\s|$)', func=lambda e: e.is_private))
 | 
						|
    async def start(e):
 | 
						|
        await e.reply('Welcome to blankie contact bot!\n\nSource: https://gitlab.com/blankX/rias or https://git.nixnet.services/blankie/rias', link_preview=False)
 | 
						|
        raise events.StopPropagation
 | 
						|
 | 
						|
    @client.on(events.NewMessage(func=lambda e: e.is_private))
 | 
						|
    async def to_receive_handler(e):
 | 
						|
        if data := banned_data.get(e.chat_id):
 | 
						|
            end_date, reason = data
 | 
						|
            if end_date and int(time.time()) > end_date:
 | 
						|
                async with banned_data_lock:
 | 
						|
                    banned_data.pop(e.chat_id)
 | 
						|
                await save_banned_data()
 | 
						|
            else:
 | 
						|
                if end_date:
 | 
						|
                    text = f'You have been banned until {datetime.fromtimestamp(end_date)}'
 | 
						|
                else:
 | 
						|
                    text = 'You have been banned indefinitely'
 | 
						|
                if reason:
 | 
						|
                    text += f' for {reason}'
 | 
						|
                await e.reply(text)
 | 
						|
                return
 | 
						|
        fwd = await e.forward_to(receive_chat)
 | 
						|
        if not fwd.forward or fwd.forward.sender_id != e.chat_id:
 | 
						|
            user = await e.get_chat()
 | 
						|
            await fwd.reply(f'Hidden User: <a href="tg://user?id={e.chat_id}">{html.escape(get_display_name(user))}</a> (<code>{e.chat_id}</code>,<code>{user.access_hash}</code>)', link_preview=False)
 | 
						|
 | 
						|
    @client.on(events.MessageEdited(func=lambda e: e.is_private))
 | 
						|
    async def no_edits_handler(e):
 | 
						|
        if not is_flood(e.chat_id):
 | 
						|
            await e.reply('Edits are currently not supported, sorry. Please resend your message')
 | 
						|
 | 
						|
    @client.on(events.NewMessage(receive_chat, func=lambda e: e.mentioned and e.reply_to_msg_id))
 | 
						|
    async def response_handler(e):
 | 
						|
        reply = await e.get_reply_message()
 | 
						|
        if not reply or reply.sender_id != me.id:
 | 
						|
            return
 | 
						|
        if reply.forward and reply.forward.sender_id:
 | 
						|
            user = await reply.forward.get_input_sender()
 | 
						|
        elif (match := hidden_fwd_regex.match(reply.raw_text)) and not reply.forward and not reply.media:
 | 
						|
            user = InputPeerUser(int(match.group(1)), int(match.group(2)))
 | 
						|
        else:
 | 
						|
            return
 | 
						|
        if e.text.lower().startswith('/ban'):
 | 
						|
            ban_reason = e.text[4:].strip()
 | 
						|
            end_date = None
 | 
						|
            await ban_user(user, end_date, ban_reason)
 | 
						|
            await e.reply('Banned')
 | 
						|
            return
 | 
						|
        if e.text.lower().startswith('/tban'):
 | 
						|
            if match := tban_regex.match(e.text):
 | 
						|
                end_date = match.group(1)
 | 
						|
                duration = end_date[-1].lower()
 | 
						|
                if duration == 'm':
 | 
						|
                    duration = 60
 | 
						|
                elif duration == 'd':
 | 
						|
                    duration = 60 * 60
 | 
						|
                elif duration == 'd':
 | 
						|
                    duration = 60 * 60 * 24
 | 
						|
                elif duration == 'w':
 | 
						|
                    duration = 60 * 60 * 24 * 7
 | 
						|
                else:
 | 
						|
                    await e.reply(f'Invalid duration {duration}, please use m, h, d or w', parse_mode=None)
 | 
						|
                    return
 | 
						|
                end_date = int(time.time() + (int(end_date[:-1]) or 1) * duration)
 | 
						|
                ban_reason = match.group(2)
 | 
						|
                await ban_user(user, end_date, ban_reason)
 | 
						|
                await e.reply('Banned')
 | 
						|
                return
 | 
						|
            await e.reply('Regex failed to match')
 | 
						|
            return
 | 
						|
        if e.text.lower().startswith('/unban'):
 | 
						|
            if user.user_id not in banned_data:
 | 
						|
                await e.reply('Never banned')
 | 
						|
                return
 | 
						|
            async with banned_data_lock:
 | 
						|
                banned_data.pop(user.user_id)
 | 
						|
            await save_banned_data()
 | 
						|
            text = 'You have been unbanned'
 | 
						|
            if unban_reason := e.text[6:].strip():
 | 
						|
                text += f' for {unban_reason}'
 | 
						|
            try:
 | 
						|
                await client.send_message(user, text)
 | 
						|
            except BaseException:
 | 
						|
                logging.exception('Exception occured while informing %s of their unban', user)
 | 
						|
            await e.reply('Unbanned')
 | 
						|
            return
 | 
						|
        try:
 | 
						|
            response_data[e.id] = (user, (await client.send_message(user, e.message)).id)
 | 
						|
        except UserIsBlockedError:
 | 
						|
            await e.reply('The bot was blocked by the user')
 | 
						|
        except BaseException:
 | 
						|
            await e.reply(traceback.format_exc(), parse_mode=None)
 | 
						|
            raise
 | 
						|
 | 
						|
    @client.on(events.MessageEdited(receive_chat, func=lambda e: e.id in response_data))
 | 
						|
    async def response_edit_handler(e):
 | 
						|
        user, message_id = response_data[e.id]
 | 
						|
        try:
 | 
						|
            await client.edit_message(user, message_id, e.text, file=None if isinstance(e.media, (MessageMediaWebPage, MessageMediaUnsupported)) else e.media, link_preview=isinstance(e.media, MessageMediaWebPage))
 | 
						|
        except UserIsBlockedError:
 | 
						|
            await e.reply('The bot was blocked by the user')
 | 
						|
        except BaseException:
 | 
						|
            await e.reply(traceback.format_exc(), parse_mode=None)
 | 
						|
            raise
 | 
						|
 | 
						|
    await client.run_until_disconnected()
 | 
						|
 | 
						|
if __name__ == '__main__':
 | 
						|
    client.loop.run_until_complete(main())
 |