219 lines
9.1 KiB
Python
219 lines
9.1 KiB
Python
import logging
|
|
logging.basicConfig(level=logging.INFO)
|
|
|
|
import os
|
|
import re
|
|
import time
|
|
import html
|
|
import yaml
|
|
import asyncio
|
|
import traceback
|
|
from datetime import datetime
|
|
from telethon import TelegramClient, events
|
|
from telethon.utils import get_display_name
|
|
from telethon.errors.rpcerrorlist import UserIsBlockedError
|
|
from telethon.tl.types import InputPeerUser, MessageMediaWebPage, MessageMediaUnsupported
|
|
|
|
config = yaml.safe_load(open('config.yaml'))
|
|
api_id = config['telegram']['api_id']
|
|
api_hash = config['telegram']['api_hash']
|
|
bot_token = config['telegram'].get('bot_token')
|
|
receive_chat = config['receive_chat']
|
|
storage_chat_id = storage_message_id = None
|
|
if config.get('storage'):
|
|
storage_chat_id = config['storage'].get('storage_chat_id')
|
|
storage_message_id = config['storage'].get('storage_message_id')
|
|
hidden_fwd_regex = re.compile(r'^Hidden User: .+ \(([0-9]+),(-?[0-9]+)\)$')
|
|
response_data = dict()
|
|
banned_data = dict()
|
|
flood_data = dict()
|
|
def is_flood(user_id):
|
|
if user_id not in flood_data:
|
|
flood_data[user_id] = []
|
|
uflood_data = flood_data[user_id]
|
|
current_time = int(time.time())
|
|
uflood_data.append(current_time)
|
|
flood_data[user_id] = uflood_data = list(filter(lambda i: current_time - i < 3, uflood_data))
|
|
return len(uflood_data) > 3
|
|
banned_data_lock = asyncio.Lock()
|
|
async def save_banned_data():
|
|
async with banned_data_lock:
|
|
with open('rias.data', 'w') as file:
|
|
file.write('0\n')
|
|
for (user_id, (end_date, reason)) in banned_data.items():
|
|
if not end_date:
|
|
end_date = ''
|
|
if not reason:
|
|
reason = ''
|
|
file.write(f'{user_id},{end_date},{reason.encode("unicode_escape").decode()}\n')
|
|
if storage_chat_id and storage_message_id:
|
|
try:
|
|
await client.edit_message(storage_chat_id, storage_message_id, file='rias.data')
|
|
except BaseException:
|
|
logging.exception('Error while uploading rias data')
|
|
async def ban_user(user, end_date, reason):
|
|
user_id = user.user_id
|
|
async with banned_data_lock:
|
|
banned_data[user_id] = (end_date, reason)
|
|
await save_banned_data()
|
|
if end_date:
|
|
text = f'You have been banned until {datetime.fromtimestamp(end_date)}'
|
|
else:
|
|
text = 'You have been banned indefinitely'
|
|
if reason:
|
|
text += f' for {reason}'
|
|
try:
|
|
await client.send_message(user, text)
|
|
except BaseException:
|
|
logging.exception('Exception occured while informing %s of their ban', user)
|
|
|
|
client = TelegramClient('rias', api_id, api_hash)
|
|
client.parse_mode = 'html'
|
|
async def main():
|
|
await client.start(bot_token=bot_token)
|
|
me = await client.get_me()
|
|
tban_regex = re.compile(f'(?i)/tban(?:@{me.username})?\\s+([0-9]+[a-z])(?:\\s+([\\s\\S]+))?')
|
|
if storage_chat_id and storage_message_id:
|
|
try:
|
|
m = await client.get_messages(storage_chat_id, ids=storage_message_id)
|
|
await m.download_media('rias.data')
|
|
except BaseException:
|
|
logging.exception('Error while downloading rias data')
|
|
if os.path.exists('rias.data'):
|
|
save_it = False
|
|
try:
|
|
with open('rias.data', 'r') as file:
|
|
version = file.readline().strip()
|
|
if version != '0':
|
|
logging.error('Unsupported rias data version %s', version)
|
|
else:
|
|
while True:
|
|
data = file.readline().strip()
|
|
if not data:
|
|
break
|
|
user_id, end_date, reason = data.split(',', 2)
|
|
if end_date and int(time.time()) > int(end_date):
|
|
save_it = True
|
|
continue
|
|
banned_data[int(user_id)] = (int(end_date) if end_date else None, reason.encode().decode('unicode_escape'))
|
|
except BaseException:
|
|
logging.exception('Error while parsing rias data')
|
|
if save_it:
|
|
await save_banned_data()
|
|
|
|
@client.on(events.NewMessage)
|
|
async def flood_handler(e):
|
|
if e.out or (e.chat_id != await client.get_peer_id(receive_chat) and is_flood(e.chat_id)):
|
|
raise events.StopPropagation
|
|
|
|
@client.on(events.NewMessage(pattern=f'(?i)/start(?:@{me.username})?(?:\\s|$)', func=lambda e: e.is_private))
|
|
async def start(e):
|
|
await e.reply('Welcome to blankie contact bot!\n\nSource: https://gitlab.com/blankX/rias')
|
|
raise events.StopPropagation
|
|
|
|
@client.on(events.NewMessage(func=lambda e: e.is_private))
|
|
async def to_receive_handler(e):
|
|
if data := banned_data.get(e.chat_id):
|
|
end_date, reason = data
|
|
if end_date and int(time.time()) > end_date:
|
|
async with banned_data_lock:
|
|
banned_data.pop(e.chat_id)
|
|
await save_banned_data()
|
|
else:
|
|
if end_date:
|
|
text = f'You have been banned until {datetime.fromtimestamp(end_date)}'
|
|
else:
|
|
text = 'You have been banned indefinitely'
|
|
if reason:
|
|
text += f' for {reason}'
|
|
await e.reply(text)
|
|
return
|
|
fwd = await e.forward_to(receive_chat)
|
|
if not fwd.forward or fwd.forward.sender_id != e.chat_id:
|
|
user = await e.get_chat()
|
|
await fwd.reply(f'Hidden User: <a href="tg://user?id={e.chat_id}">{html.escape(get_display_name(user))}</a> (<code>{e.chat_id}</code>,<code>{user.access_hash}</code>)')
|
|
|
|
@client.on(events.MessageEdited(func=lambda e: e.is_private))
|
|
async def no_edits_handler(e):
|
|
if not is_flood(e.chat_id):
|
|
await e.reply('Edits are currently not supported, sorry. Please resend your message')
|
|
|
|
@client.on(events.NewMessage(receive_chat, func=lambda e: e.mentioned and e.reply_to_msg_id))
|
|
async def response_handler(e):
|
|
reply = await e.get_reply_message()
|
|
if not reply or reply.sender_id != me.id:
|
|
return
|
|
if reply.forward and reply.forward.sender_id:
|
|
user = await reply.forward.get_input_sender()
|
|
elif match := hidden_fwd_regex.match(reply.raw_text):
|
|
user = InputPeerUser(int(match.group(1)), int(match.group(2)))
|
|
else:
|
|
return
|
|
if e.text.lower().startswith('/ban'):
|
|
ban_reason = e.text[4:].strip()
|
|
end_date = None
|
|
await ban_user(user, end_date, ban_reason)
|
|
await e.reply('Banned')
|
|
return
|
|
if e.text.lower().startswith('/tban'):
|
|
if match := tban_regex.match(e.text):
|
|
end_date = match.group(1)
|
|
duration = end_date[-1].lower()
|
|
if duration == 'm':
|
|
duration = 60
|
|
elif duration == 'd':
|
|
duration = 60 * 60
|
|
elif duration == 'd':
|
|
duration = 60 * 60 * 24
|
|
elif duration == 'w':
|
|
duration = 60 * 60 * 24 * 7
|
|
else:
|
|
await e.reply(f'Invalid duration {duration}, please use m, h, d or w', parse_mode=None)
|
|
return
|
|
end_date = int(time.time() + (int(end_date[:-1]) or 1) * duration)
|
|
ban_reason = match.group(2)
|
|
await ban_user(user, end_date, ban_reason)
|
|
await e.reply('Banned')
|
|
return
|
|
await e.reply('Regex failed to match')
|
|
return
|
|
if e.text.lower().startswith('/unban'):
|
|
if user.user_id not in banned_data:
|
|
await e.reply('Never banned')
|
|
return
|
|
async with banned_data_lock:
|
|
banned_data.pop(user.user_id)
|
|
await save_banned_data()
|
|
text = 'You have been unbanned'
|
|
if unban_reason := e.text[6:].strip():
|
|
text += f' for {unban_reason}'
|
|
try:
|
|
await client.send_message(user, text)
|
|
except BaseException:
|
|
logging.exception('Exception occured while informing %s of their unban', user)
|
|
await e.reply('Unbanned')
|
|
return
|
|
try:
|
|
response_data[e.id] = (user, (await client.send_message(user, e.message)).id)
|
|
except UserIsBlockedError:
|
|
await e.reply('The bot was blocked by the user')
|
|
except BaseException:
|
|
await e.reply(traceback.format_exc(), parse_mode=None)
|
|
raise
|
|
|
|
@client.on(events.MessageEdited(receive_chat, func=lambda e: e.id in response_data))
|
|
async def response_edit_handler(e):
|
|
user, message_id = response_data[e.id]
|
|
try:
|
|
await client.edit_message(user, message_id, e.text, file=None if isinstance(e.media, (MessageMediaWebPage, MessageMediaUnsupported)) else e.media, link_preview=isinstance(e.media, MessageMediaWebPage))
|
|
except UserIsBlockedError:
|
|
await e.reply('The bot was blocked by the user')
|
|
except BaseException:
|
|
await e.reply(traceback.format_exc(), parse_mode=None)
|
|
raise
|
|
|
|
await client.run_until_disconnected()
|
|
|
|
if __name__ == '__main__':
|
|
client.loop.run_until_complete(main())
|