forked from 39bit/spoilerobot
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathmessage_serializer.py
141 lines (124 loc) · 4.33 KB
/
message_serializer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
"""
Handles serializing and deserializing of messages
"""
import json
import logging
from typing import Any, List
from telethon import types
from telethon.extensions import html
from telethon.tl.custom import Message
from structs import ContentGeneric, Spoiler
from util import pack_bot_file_id
from file_id import file_id_to_input_media
logger = logging.getLogger('serializer')
FILE_ID_KEYS = (
'file',
# Old spoilers used these keys to store the file_id
'photo', 'audio', 'document', 'video', 'voice', 'sticker', 'video_note'
)
def serialize_messages(message_list: List[Message]):
"""
Groups the content from multiple spoilers into one album spoiler
"""
if len(message_list) == 1:
return serialize_message(message_list[0])
contents = [serialize_message(msg).content for msg in message_list]
return Spoiler(type='Album', description='', content=contents)
def serialize_message(message: Message) -> Spoiler:
"""
Serializes a message for storing in the database
"""
data = {}
content_type = None
if message.text:
data['text'] = message.text
content_type = 'HTML'
if message.photo:
data['file'] = pack_bot_file_id(message.media)
content_type = 'Photo'
if message.document:
data['file'] = pack_bot_file_id(message.document)
content_type = 'Document'
if message.contact:
content_type = 'Contact'
data['phone_number'] = message.contact.phone_number
data['first_name'] = message.contact.first_name
data['last_name'] = message.contact.last_name
if message.geo:
content_type = 'Location'
# TODO: test empty geopoint and fix venue
data['lat'] = message.geo.lat
data['long'] = message.geo.long
data['rad'] = message.geo.accuracy_radius
if message.poll:
raise ValueError('resending polls is not possible.')
if not content_type:
logger.error(f'Failed to serialise: {message.stringify()}')
raise ValueError('not implemented yet.')
# Turn plain text's content into a string
if content_type == 'HTML':
data = data['text']
return Spoiler(type=content_type, description='', content=data)
def _extract_content(content_type, content) -> ContentGeneric:
"""
Converts a database message into a ContentGeneric
"""
if isinstance(content, str):
is_media = content_type not in {'Text', 'HTML'}
# old spoilers used a json string as the content, so attempt to decode it
# (putting json inside json was not the best idea)
decoded_content = None
if is_media:
try:
decoded_content = json.loads(content)
except json.JSONDecodeError:
logger.error(f'Failed to decode nested JSON content: "{content}"')
# This interprets media json that fails to decode as text
# which is probably not desirable
if not decoded_content:
return ContentGeneric(
html.escape(content) if content_type == 'Text' else content,
file_id=None
)
content = decoded_content
text = content.get('text', '') or content.get('caption', '')
file_id: Any = None
if content_type == 'Contact':
file_id = types.InputMediaContact(
phone_number=content['phone_number'],
first_name=content['first_name'],
last_name=content['last_name'],
vcard=''
)
if content_type == 'Location':
file_id = types.InputMediaGeoPoint(
types.InputGeoPoint(
lat=content['lat'],
long=content['long'],
accuracy_radius=content.get('rad')
)
)
if not file_id:
file_id = next((content[k] for k in FILE_ID_KEYS if k in content), None)
return ContentGeneric(text, file_id)
def extract_contents(spoiler: Spoiler) -> ContentGeneric | List[ContentGeneric]:
"""
Extracts one or more ContentGeneric from a database spoiler
"""
if isinstance(spoiler.content, list):
return [_extract_content(spoiler.type, content) for content in spoiler.content]
return _extract_content(spoiler.type, spoiler.content)
def deserialize_to_params(spoiler: Spoiler):
"""
Deserializes a database spoiler to a dict of params for sending
"""
contents = extract_contents(spoiler)
if isinstance(contents, list):
return {
'message': [content.text for content in contents],
'file': [file_id_to_input_media(content.file_id) for content in contents if content.file_id],
}
return {
'message': contents.text,
'file': file_id_to_input_media(contents.file_id) if contents.file_id else None
}