forked from 39bit/spoilerobot
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathhandlers_pm.py
196 lines (163 loc) · 6.63 KB
/
handlers_pm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
import asyncio
from telethon import custom, events
from telethon.errors import UserIsBlockedError
from telethon.types import PeerChannel
from telethon.tl.functions.channels import LeaveChannelRequest
import database
from config import MAX_DESCRIPTION_LENGTH, CONTACT_TEXT
from database import UUID
from message_serializer import deserialize_to_params, serialize_messages
from proxy import client, logger
from util import suppress_exceptions
# Basic conversation handler based on Telethon's (now deprecated) custom.Conversation
class Conversation:
def __init__(self, chat_id):
self.chat_id = chat_id
self._responses = asyncio.Queue()
def put_response(self, response):
self._responses.put_nowait(response)
async def get_response(self, timeout):
return await asyncio.wait_for(self._responses.get(), timeout)
def __enter__(self):
if self.chat_id in active_conversations:
raise RuntimeError(f'Already waiting for response in chat {self.chat_id}')
active_conversations[self.chat_id] = self
return self
def __exit__(self, exc_type, exc_val, exc_tb):
del active_conversations[self.chat_id]
active_conversations: dict[int, Conversation] = {}
# This has to come before the conversation handler so it works while we're in a conversation
@client.on(events.NewMessage(incoming=True, forwards=False, pattern='^/start (.{64})$'))
@suppress_exceptions(logger, UserIsBlockedError)
async def on_start_id(event: events.NewMessage.Event):
uuid = database.UUID(event.pattern_match.group(1))
spoiler = await database.get_spoiler(uuid)
if not spoiler:
logger.error(f"{event.chat_id} requested missing spoiler uuid={uuid.get_str()} t={uuid.read_timestamp()}")
await event.respond('Spoiler not found. Too old?')
raise events.StopPropagation
logger.info(f"{event.chat_id} requested {spoiler.type}")
params = deserialize_to_params(spoiler)
await event.respond(**params)
raise events.StopPropagation
@client.on(events.NewMessage(incoming=True, func=lambda e: isinstance(e.peer_id, PeerChannel)))
async def on_channel_msg(event: events.NewMessage.Event):
logger.error(f'Leaving channel {event.peer_id}')
await client(LeaveChannelRequest(event.peer_id))
raise events.StopPropagation
@client.on(events.NewMessage(incoming=True))
async def on_conversation_message(event: events.NewMessage.Event):
if event.chat_id in active_conversations:
active_conversations[event.chat_id].put_response(event.message)
raise events.StopPropagation
class CancelledError(Exception):
pass
async def get_cancellable_response(conv: Conversation):
while 1:
response = await conv.get_response(timeout=6*50)
if not response.media and response.raw_text.startswith('/cancel'):
raise CancelledError
if response.raw_text.startswith('/'):
continue
return response
async def wait_for_album(conv: Conversation, message):
"""
Waits for more messages if message is an album, because
telegram sends them separately
Thanks to @lonami for this code
"""
if not message.grouped_id:
return [message]
items = [message]
try:
while 1:
item = await conv.get_response(timeout=0.1)
if item.grouped_id != items[0].grouped_id:
break
items.append(item)
except asyncio.TimeoutError:
pass
return items
@client.on(events.NewMessage(incoming=True, forwards=False, pattern='^/start( inline)?$'))
@client.on(events.NewMessage(incoming=True, func=lambda e: e.media))
@suppress_exceptions(logger, UserIsBlockedError)
async def on_media_or_start(event: events.NewMessage.Event):
try:
with Conversation(event.chat_id) as conv:
content_msg = event.message
if event.pattern_match:
await event.respond(
'Preparing a spoiler. To cancel, type /cancel.\n\n'
'First send the content to be spoiled. It can be text, photo, or any other media.'
)
content_msg = await get_cancellable_response(conv)
spoiler = serialize_messages(await wait_for_album(conv, content_msg))
content_msg = None
await event.respond(
f'Now send a title for the spoiler (up to {MAX_DESCRIPTION_LENGTH} characters). '
'It will be immediately visible and can be used to add a small description '
'for your spoiler.\n'
'Type a dash (-) now if you do not want a title for your spoiler.'
)
description = ''
while not description:
response = await get_cancellable_response(conv)
if response.fwd_from or response.media:
continue
description = response.raw_text
if description == '-':
description = ''
break
if len(description) > MAX_DESCRIPTION_LENGTH:
await event.respond(
f'The given title is too long (up to {MAX_DESCRIPTION_LENGTH} characters).\n'
'Please try again.'
)
description = ''
spoiler.description = description
logger.info(f'{event.chat_id} created {spoiler.type}')
uuid = UUID()
await database.insert_spoiler(uuid, spoiler, event.chat_id)
await event.respond(
'Done! Your advanced spoiler is ready.',
buttons=custom.Button.switch_inline(
'Send it!',
f'id:{uuid.get_str()}'
)
)
except (asyncio.TimeoutError, CancelledError):
from_inline = event.pattern_match and event.pattern_match.group(1)
await event.respond(
'The spoiler preparation has been cancelled.',
buttons=custom.Button.switch_inline('OK') if from_inline else None
)
except UserIsBlockedError:
raise
except Exception as e:
logger.exception(f'Error processing media')
await event.respond(
f'There was an error processing the request: {e}\n'
'The spoiler preparation has been cancelled.'
)
@client.on(events.NewMessage(incoming=True, forwards=False, pattern='^/start ignore$'))
async def on_start_ignored(event: events.NewMessage.Event):
await event.delete()
@client.on(events.NewMessage(incoming=True, pattern='^/help$'))
async def on_help(event: events.NewMessage.Event):
await event.respond(
'Send me media or /start to prepare an advanced spoiler with a custom title.\n'
'\n'
'You can type quick spoilers by using @spoilerobot in inline mode:\n'
'<code>@spoilerobot spoiler here…</code>\n'
'\n'
'Custom titles can also be used from inline mode as follows:\n'
'<code>@spoilerobot title for the spoiler:::contents of the spoiler</code>\n'
'\n'
'Note that the title will be immediately visible!\n'
'\n'
+ (CONTACT_TEXT if CONTACT_TEXT else ''),
link_preview=False
)
@client.on(events.NewMessage(incoming=True, pattern='^/ping$'))
async def on_ping(event: events.NewMessage.Event):
await event.reply('pong')