rias/rias.py

219 lines
9.1 KiB
Python

import logging
logging.basicConfig(level=logging.INFO)
import os
import re
import time
import html
import yaml
import asyncio
import traceback
from datetime import datetime
from telethon import TelegramClient, events
from telethon.utils import get_display_name
from telethon.errors.rpcerrorlist import UserIsBlockedError
from telethon.tl.types import InputPeerUser, MessageMediaWebPage, MessageMediaUnsupported
config = yaml.safe_load(open('config.yaml'))
api_id = config['telegram']['api_id']
api_hash = config['telegram']['api_hash']
bot_token = config['telegram'].get('bot_token')
receive_chat = config['receive_chat']
storage_chat_id = storage_message_id = None
if config.get('storage'):
storage_chat_id = config['storage'].get('storage_chat_id')
storage_message_id = config['storage'].get('storage_message_id')
hidden_fwd_regex = re.compile(r'^Hidden User: .+ \(([0-9]+),(-?[0-9]+)\)$')
response_data = dict()
banned_data = dict()
flood_data = dict()
def is_flood(user_id):
if user_id not in flood_data:
flood_data[user_id] = []
uflood_data = flood_data[user_id]
current_time = int(time.time())
uflood_data.append(current_time)
flood_data[user_id] = uflood_data = list(filter(lambda i: current_time - i < 3, uflood_data))
return len(uflood_data) > 3
banned_data_lock = asyncio.Lock()
async def save_banned_data():
async with banned_data_lock:
with open('rias.data', 'w') as file:
file.write('0\n')
for (user_id, (end_date, reason)) in banned_data.items():
if not end_date:
end_date = ''
if not reason:
reason = ''
file.write(f'{user_id},{end_date},{reason.encode("unicode_escape").decode()}\n')
if storage_chat_id and storage_message_id:
try:
await client.edit_message(storage_chat_id, storage_message_id, file='rias.data')
except BaseException:
logging.exception('Error while uploading rias data')
async def ban_user(user, end_date, reason):
user_id = user.user_id
async with banned_data_lock:
banned_data[user_id] = (end_date, reason)
await save_banned_data()
if end_date:
text = f'You have been banned until {datetime.fromtimestamp(end_date)}'
else:
text = 'You have been banned indefinitely'
if reason:
text += f' for {reason}'
try:
await client.send_message(user, text)
except BaseException:
logging.exception('Exception occured while informing %s of their ban', user)
client = TelegramClient('rias', api_id, api_hash)
client.parse_mode = 'html'
async def main():
await client.start(bot_token=bot_token)
me = await client.get_me()
tban_regex = re.compile(f'(?i)/tban(?:@{me.username})?\\s+([0-9]+[a-z])(?:\\s+([\\s\\S]+))?')
if storage_chat_id and storage_message_id:
try:
m = await client.get_messages(storage_chat_id, ids=storage_message_id)
await m.download_media('rias.data')
except BaseException:
logging.exception('Error while downloading rias data')
if os.path.exists('rias.data'):
save_it = False
try:
with open('rias.data', 'r') as file:
version = file.readline().strip()
if version != '0':
logging.error('Unsupported rias data version %s', version)
else:
while True:
data = file.readline().strip()
if not data:
break
user_id, end_date, reason = data.split(',', 2)
if end_date and int(time.time()) > int(end_date):
save_it = True
continue
banned_data[int(user_id)] = (int(end_date) if end_date else None, reason.encode().decode('unicode_escape'))
except BaseException:
logging.exception('Error while parsing rias data')
if save_it:
await save_banned_data()
@client.on(events.NewMessage)
async def flood_handler(e):
if e.out or (e.chat_id != await client.get_peer_id(receive_chat) and is_flood(e.chat_id)):
raise events.StopPropagation
@client.on(events.NewMessage(pattern=f'(?i)/start(?:@{me.username})?(?:\\s|$)', func=lambda e: e.is_private))
async def start(e):
await e.reply('Welcome to blankie contact bot!\n\nSource: https://gitlab.com/blankX/rias')
raise events.StopPropagation
@client.on(events.NewMessage(func=lambda e: e.is_private))
async def to_receive_handler(e):
if data := banned_data.get(e.chat_id):
end_date, reason = data
if end_date and int(time.time()) > end_date:
async with banned_data_lock:
banned_data.pop(e.chat_id)
await save_banned_data()
else:
if end_date:
text = f'You have been banned until {datetime.fromtimestamp(end_date)}'
else:
text = 'You have been banned indefinitely'
if reason:
text += f' for {reason}'
await e.reply(text)
return
fwd = await e.forward_to(receive_chat)
if not fwd.forward or fwd.forward.sender_id != e.chat_id:
user = await e.get_chat()
await fwd.reply(f'Hidden User: <a href="tg://user?id={e.chat_id}">{html.escape(get_display_name(user))}</a> (<code>{e.chat_id}</code>,<code>{user.access_hash}</code>)')
@client.on(events.MessageEdited(func=lambda e: e.is_private))
async def no_edits_handler(e):
if not is_flood(e.chat_id):
await e.reply('Edits are currently not supported, sorry. Please resend your message')
@client.on(events.NewMessage(receive_chat, func=lambda e: e.mentioned and e.reply_to_msg_id))
async def response_handler(e):
reply = await e.get_reply_message()
if not reply or reply.sender_id != me.id:
return
if reply.forward and reply.forward.sender_id:
user = await reply.forward.get_input_sender()
elif match := hidden_fwd_regex.match(reply.raw_text):
user = InputPeerUser(int(match.group(1)), int(match.group(2)))
else:
return
if e.text.lower().startswith('/ban'):
ban_reason = e.text[4:].strip()
end_date = None
await ban_user(user, end_date, ban_reason)
await e.reply('Banned')
return
if e.text.lower().startswith('/tban'):
if match := tban_regex.match(e.text):
end_date = match.group(1)
duration = end_date[-1].lower()
if duration == 'm':
duration = 60
elif duration == 'd':
duration = 60 * 60
elif duration == 'd':
duration = 60 * 60 * 24
elif duration == 'w':
duration = 60 * 60 * 24 * 7
else:
await e.reply(f'Invalid duration {duration}, please use m, h, d or w', parse_mode=None)
return
end_date = int(time.time() + (int(end_date[:-1]) or 1) * duration)
ban_reason = match.group(2)
await ban_user(user, end_date, ban_reason)
await e.reply('Banned')
return
await e.reply('Regex failed to match')
return
if e.text.lower().startswith('/unban'):
if user.user_id not in banned_data:
await e.reply('Never banned')
return
async with banned_data_lock:
banned_data.pop(user.user_id)
await save_banned_data()
text = 'You have been unbanned'
if unban_reason := e.text[6:].strip():
text += f' for {unban_reason}'
try:
await client.send_message(user, text)
except BaseException:
logging.exception('Exception occured while informing %s of their unban', user)
await e.reply('Unbanned')
return
try:
response_data[e.id] = (user, (await client.send_message(user, e.message)).id)
except UserIsBlockedError:
await e.reply('The bot was blocked by the user')
except BaseException:
await e.reply(traceback.format_exc(), parse_mode=None)
raise
@client.on(events.MessageEdited(receive_chat, func=lambda e: e.id in response_data))
async def response_edit_handler(e):
user, message_id = response_data[e.id]
try:
await client.edit_message(user, message_id, e.text, file=None if isinstance(e.media, (MessageMediaWebPage, MessageMediaUnsupported)) else e.media, link_preview=isinstance(e.media, MessageMediaWebPage))
except UserIsBlockedError:
await e.reply('The bot was blocked by the user')
except BaseException:
await e.reply(traceback.format_exc(), parse_mode=None)
raise
await client.run_until_disconnected()
if __name__ == '__main__':
client.loop.run_until_complete(main())