2021-02-11 10:52:33 +00:00
import logging
logging . basicConfig ( level = logging . INFO )
import os
import re
import time
import html
import yaml
import asyncio
import traceback
from datetime import datetime
from telethon import TelegramClient , events
from telethon . utils import get_display_name
from telethon . errors . rpcerrorlist import UserIsBlockedError
from telethon . tl . types import InputPeerUser , MessageMediaWebPage , MessageMediaUnsupported
config = yaml . safe_load ( open ( ' config.yaml ' ) )
api_id = config [ ' telegram ' ] [ ' api_id ' ]
api_hash = config [ ' telegram ' ] [ ' api_hash ' ]
bot_token = config [ ' telegram ' ] . get ( ' bot_token ' )
receive_chat = config [ ' receive_chat ' ]
storage_chat_id = storage_message_id = None
if config . get ( ' storage ' ) :
storage_chat_id = config [ ' storage ' ] . get ( ' storage_chat_id ' )
storage_message_id = config [ ' storage ' ] . get ( ' storage_message_id ' )
hidden_fwd_regex = re . compile ( r ' ^Hidden User: .+ \ (([0-9]+),(-?[0-9]+) \ )$ ' )
response_data = dict ( )
banned_data = dict ( )
flood_data = dict ( )
def is_flood ( user_id ) :
if user_id not in flood_data :
flood_data [ user_id ] = [ ]
uflood_data = flood_data [ user_id ]
current_time = int ( time . time ( ) )
uflood_data . append ( current_time )
flood_data [ user_id ] = uflood_data = list ( filter ( lambda i : current_time - i < 3 , uflood_data ) )
return len ( uflood_data ) > 3
banned_data_lock = asyncio . Lock ( )
async def save_banned_data ( ) :
async with banned_data_lock :
with open ( ' rias.data ' , ' w ' ) as file :
file . write ( ' 0 \n ' )
for ( user_id , ( end_date , reason ) ) in banned_data . items ( ) :
if not end_date :
end_date = ' '
if not reason :
reason = ' '
file . write ( f ' { user_id } , { end_date } , { reason . encode ( " unicode_escape " ) . decode ( ) } \n ' )
if storage_chat_id and storage_message_id :
try :
await client . edit_message ( storage_chat_id , storage_message_id , file = ' rias.data ' )
except BaseException :
logging . exception ( ' Error while uploading rias data ' )
async def ban_user ( user , end_date , reason ) :
user_id = user . user_id
async with banned_data_lock :
banned_data [ user_id ] = ( end_date , reason )
await save_banned_data ( )
if end_date :
text = f ' You have been banned until { datetime . fromtimestamp ( end_date ) } '
else :
text = ' You have been banned indefinitely '
if reason :
text + = f ' for { reason } '
try :
await client . send_message ( user , text )
except BaseException :
logging . exception ( ' Exception occured while informing %s of their ban ' , user )
client = TelegramClient ( ' rias ' , api_id , api_hash )
client . parse_mode = ' html '
async def main ( ) :
await client . start ( bot_token = bot_token )
me = await client . get_me ( )
tban_regex = re . compile ( f ' (?i)/tban(?:@ { me . username } )? \\ s+([0-9]+[a-z])(?: \\ s+([ \\ s \\ S]+))? ' )
if storage_chat_id and storage_message_id :
try :
m = await client . get_messages ( storage_chat_id , ids = storage_message_id )
await m . download_media ( ' rias.data ' )
except BaseException :
logging . exception ( ' Error while downloading rias data ' )
if os . path . exists ( ' rias.data ' ) :
save_it = False
try :
with open ( ' rias.data ' , ' r ' ) as file :
version = file . readline ( ) . strip ( )
if version != ' 0 ' :
logging . error ( ' Unsupported rias data version %s ' , version )
else :
while True :
data = file . readline ( ) . strip ( )
if not data :
break
user_id , end_date , reason = data . split ( ' , ' , 2 )
if end_date and int ( time . time ( ) ) > int ( end_date ) :
save_it = True
continue
banned_data [ int ( user_id ) ] = ( int ( end_date ) if end_date else None , reason . encode ( ) . decode ( ' unicode_escape ' ) )
except BaseException :
logging . exception ( ' Error while parsing rias data ' )
if save_it :
await save_banned_data ( )
@client.on ( events . NewMessage )
async def flood_handler ( e ) :
if e . out or ( e . chat_id != await client . get_peer_id ( receive_chat ) and is_flood ( e . chat_id ) ) :
raise events . StopPropagation
@client.on ( events . NewMessage ( pattern = f ' (?i)/start(?:@ { me . username } )?(?: \\ s|$) ' , func = lambda e : e . is_private ) )
async def start ( e ) :
2021-04-02 08:37:17 +00:00
await e . reply ( ' Welcome to blankie contact bot! \n \n Source: https://gitlab.com/blankX/rias or https://git.nixnet.services/blankie/rias ' , link_preview = False )
2021-02-11 10:52:33 +00:00
raise events . StopPropagation
@client.on ( events . NewMessage ( func = lambda e : e . is_private ) )
async def to_receive_handler ( e ) :
if data := banned_data . get ( e . chat_id ) :
end_date , reason = data
if end_date and int ( time . time ( ) ) > end_date :
async with banned_data_lock :
banned_data . pop ( e . chat_id )
await save_banned_data ( )
else :
if end_date :
text = f ' You have been banned until { datetime . fromtimestamp ( end_date ) } '
else :
text = ' You have been banned indefinitely '
if reason :
text + = f ' for { reason } '
await e . reply ( text )
return
fwd = await e . forward_to ( receive_chat )
if not fwd . forward or fwd . forward . sender_id != e . chat_id :
user = await e . get_chat ( )
2021-02-11 12:52:52 +00:00
await fwd . reply ( f ' Hidden User: <a href= " tg://user?id= { e . chat_id } " > { html . escape ( get_display_name ( user ) ) } </a> (<code> { e . chat_id } </code>,<code> { user . access_hash } </code>) ' , link_preview = False )
2021-02-11 10:52:33 +00:00
@client.on ( events . MessageEdited ( func = lambda e : e . is_private ) )
async def no_edits_handler ( e ) :
if not is_flood ( e . chat_id ) :
await e . reply ( ' Edits are currently not supported, sorry. Please resend your message ' )
@client.on ( events . NewMessage ( receive_chat , func = lambda e : e . mentioned and e . reply_to_msg_id ) )
async def response_handler ( e ) :
reply = await e . get_reply_message ( )
if not reply or reply . sender_id != me . id :
return
if reply . forward and reply . forward . sender_id :
user = await reply . forward . get_input_sender ( )
2021-02-11 12:52:52 +00:00
elif ( match := hidden_fwd_regex . match ( reply . raw_text ) ) and not reply . forward and not reply . media :
2021-02-11 10:52:33 +00:00
user = InputPeerUser ( int ( match . group ( 1 ) ) , int ( match . group ( 2 ) ) )
else :
return
if e . text . lower ( ) . startswith ( ' /ban ' ) :
ban_reason = e . text [ 4 : ] . strip ( )
end_date = None
await ban_user ( user , end_date , ban_reason )
await e . reply ( ' Banned ' )
return
if e . text . lower ( ) . startswith ( ' /tban ' ) :
if match := tban_regex . match ( e . text ) :
end_date = match . group ( 1 )
duration = end_date [ - 1 ] . lower ( )
if duration == ' m ' :
duration = 60
elif duration == ' d ' :
duration = 60 * 60
elif duration == ' d ' :
duration = 60 * 60 * 24
elif duration == ' w ' :
duration = 60 * 60 * 24 * 7
else :
await e . reply ( f ' Invalid duration { duration } , please use m, h, d or w ' , parse_mode = None )
return
end_date = int ( time . time ( ) + ( int ( end_date [ : - 1 ] ) or 1 ) * duration )
ban_reason = match . group ( 2 )
await ban_user ( user , end_date , ban_reason )
await e . reply ( ' Banned ' )
return
await e . reply ( ' Regex failed to match ' )
return
if e . text . lower ( ) . startswith ( ' /unban ' ) :
if user . user_id not in banned_data :
await e . reply ( ' Never banned ' )
return
async with banned_data_lock :
banned_data . pop ( user . user_id )
await save_banned_data ( )
text = ' You have been unbanned '
if unban_reason := e . text [ 6 : ] . strip ( ) :
text + = f ' for { unban_reason } '
try :
await client . send_message ( user , text )
except BaseException :
logging . exception ( ' Exception occured while informing %s of their unban ' , user )
await e . reply ( ' Unbanned ' )
return
try :
response_data [ e . id ] = ( user , ( await client . send_message ( user , e . message ) ) . id )
except UserIsBlockedError :
await e . reply ( ' The bot was blocked by the user ' )
except BaseException :
await e . reply ( traceback . format_exc ( ) , parse_mode = None )
raise
@client.on ( events . MessageEdited ( receive_chat , func = lambda e : e . id in response_data ) )
async def response_edit_handler ( e ) :
user , message_id = response_data [ e . id ]
try :
await client . edit_message ( user , message_id , e . text , file = None if isinstance ( e . media , ( MessageMediaWebPage , MessageMediaUnsupported ) ) else e . media , link_preview = isinstance ( e . media , MessageMediaWebPage ) )
except UserIsBlockedError :
await e . reply ( ' The bot was blocked by the user ' )
except BaseException :
await e . reply ( traceback . format_exc ( ) , parse_mode = None )
raise
await client . run_until_disconnected ( )
if __name__ == ' __main__ ' :
client . loop . run_until_complete ( main ( ) )