forked from 39bit/spoilerobot
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathfile_id.py
143 lines (120 loc) · 3.79 KB
/
file_id.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import enum
from construct import (
Aligned, BitStruct, Byte, Bytes, BytesInteger, Const, Default, Enum, ExprValidator,
Flag, GreedyBytes, If, IfThenElse, Indexing, Int32sl, Int64sl, OffsettedEnd, Peek, Pointer,
Prefixed, Select, Sequence, StopIf, Struct, ValidationError, obj_, this,
)
from telethon import utils
from telethon.types import InputDocument, InputPhoto
class StrictEnum(Enum):
def _decode(self, obj, context, path):
try:
return self.decmapping[obj]
except KeyError:
pass
raise ValidationError(f'{obj} is not a valid enum value for {path}')
class MajorVersion(enum.IntEnum):
OLD = 2
NEW = 4
# https://github.com/tdlib/td/blob/master/td/telegram/files/FileType.h
class FileType(enum.IntEnum):
Thumbnail = 0
ProfilePhoto = enum.auto()
Photo = enum.auto()
VoiceNote = enum.auto()
Video = enum.auto()
Document = enum.auto()
Encrypted = enum.auto()
Temp = enum.auto()
Sticker = enum.auto()
Audio = enum.auto()
Animation = enum.auto()
EncryptedThumbnail = enum.auto()
Wallpaper = enum.auto()
VideoNote = enum.auto()
SecureDecrypted = enum.auto()
SecureEncrypted = enum.auto()
Background = enum.auto()
DocumentAsFile = enum.auto()
Ringtone = enum.auto()
CallLog = enum.auto()
PhotoStory = enum.auto()
VideoStory = enum.auto()
Size = enum.auto()
None_ = enum.auto()
# https://github.com/tdlib/td/blob/master/td/telegram/Version.h
class TDVersion(enum.IntEnum):
SupportRepliesInOtherChats = 51
Next = enum.auto()
PhotoFileTypes = {
FileType.Photo,
FileType.ProfilePhoto,
FileType.Thumbnail,
FileType.EncryptedThumbnail,
FileType.Wallpaper,
FileType.PhotoStory
}
Int24sl = BytesInteger(3, signed=True, swapped=True)
TLString = Aligned(4, Prefixed(
Select(
ExprValidator(Byte, obj_ < 254),
Indexing(Sequence(Const(254, Byte), Int24sl), 2, 1, empty=254),
),
GreedyBytes
))
_WebLocation = Struct(
'url' / TLString,
'access_hash' / Int64sl
)
_FileIDData = Struct(
'version' / Default(Pointer(-1, Byte), TDVersion.Next - 1),
# Flags only occur in 4th byte, assume type is intended to be 3 bytes
'type' / StrictEnum(Int24sl, FileType),
'flags' / BitStruct(
'_unused_1' / Default(Flag, False),
'_unused_2' / Default(Flag, False),
'_unused_3' / Default(Flag, False),
'_unused_4' / Default(Flag, False),
'_unused_5' / Default(Flag, False),
'_unused_6' / Default(Flag, False),
'file_reference' / Default(Flag, False),
'web_location' / Default(Flag, False),
),
'dc_id' / Int32sl,
'file_reference' / IfThenElse(this.flags.file_reference, TLString, Bytes(0)),
'web_location' / If(this.flags.web_location, _WebLocation),
StopIf(this.web_location),
'id' / Int64sl,
'access_hash' / Int64sl,
# TODO: parse photo_size
'photo_size' / If(
lambda this: int(this.type) in PhotoFileTypes,
OffsettedEnd(-1, GreedyBytes)
),
'version' / Default(Byte, TDVersion.Next - 1)
)
FileID = Struct(
'major_version' / Default(Pointer(-1, StrictEnum(Byte, MajorVersion)), MajorVersion.NEW),
'data' / IfThenElse(
lambda this: int(this.major_version) == MajorVersion.NEW,
OffsettedEnd(-1, _FileIDData),
_FileIDData,
),
'major_version' / If(this.major_version == MajorVersion.NEW, Byte)
)
def file_id_to_input_media(file_id):
data = utils._rle_decode(utils._decode_telegram_base64(file_id))
parsed = FileID.parse(data)
if parsed.data.web_location:
raise ValueError(f'Can\'t get input media from web location file_id: {file_id} url={parsed.data.web_location.url}')
if parsed.data.photo_size:
return InputPhoto(
id=parsed.data.id,
access_hash=parsed.data.access_hash,
file_reference=parsed.data.file_reference
)
return InputDocument(
id=parsed.data.id,
access_hash=parsed.data.access_hash,
file_reference=parsed.data.file_reference
)