forked from larksuite/oapi-sdk-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathraw.py
115 lines (92 loc) · 3.39 KB
/
raw.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import io
from requests_toolbelt import MultipartEncoder
import lark_oapi as lark
# 通过手机号或邮箱获取用户 ID
def batch_get_id_user():
# 创建client
client = lark.Client.builder() \
.app_id(lark.APP_ID) \
.app_secret(lark.APP_SECRET) \
.log_level(lark.LogLevel.DEBUG) \
.build()
# 构造请求对象
request: lark.BaseRequest = lark.BaseRequest.builder() \
.http_method(lark.HttpMethod.POST) \
.uri("/open-apis/contact/v3/users/batch_get_id") \
.token_types({lark.AccessTokenType.TENANT}) \
.queries([("user_id_type", "open_id")]) \
.body({"emails": ["[email protected]"], "mobiles": ["15000000000"]}) \
.build()
# 发起请求
response: lark.BaseResponse = client.request(request)
# 处理失败返回
if not response.success():
lark.logger.error(
f"client.request failed, code: {response.code}, msg: {response.msg}, log_id: {response.get_log_id()}")
return
# 处理业务结果
lark.logger.info(str(response.raw.content, lark.UTF_8))
# 上传文件
def upload_all_file():
# 创建client
client = lark.Client.builder() \
.app_id(lark.APP_ID) \
.app_secret(lark.APP_SECRET) \
.log_level(lark.LogLevel.DEBUG) \
.build()
# 构造请求对象
file = open("/Users/bytedance/Desktop/demo.pdf", "rb")
data = {
"file_name": "demo.pdf",
"parent_type": "explorer",
"parent_node": "Y9LhfoWNZlKxWcdsf2fcPP0SnXc",
"size": 1199,
"file": file
}
body = MultipartEncoder(lark.Files.parse_form_data(data))
request: lark.BaseRequest = lark.BaseRequest.builder() \
.http_method(lark.HttpMethod.POST) \
.uri("/open-apis/drive/v1/files/upload_all") \
.headers({"Content-Type": body.content_type}) \
.token_types({lark.AccessTokenType.TENANT}) \
.body(body) \
.build()
# 发起请求
response: lark.BaseResponse = client.request(request)
# 处理失败返回
if not response.success():
lark.logger.error(
f"client.request failed, code: {response.code}, msg: {response.msg}, log_id: {response.get_log_id()}")
return
# 处理业务结果
lark.logger.info(str(response.raw.content, lark.UTF_8))
# 下载文件
def download_file():
# 创建client
client = lark.Client.builder() \
.app_id(lark.APP_ID) \
.app_secret(lark.APP_SECRET) \
.log_level(lark.LogLevel.DEBUG) \
.build()
# 构造请求对象
request: lark.BaseRequest = lark.BaseRequest.builder() \
.http_method(lark.HttpMethod.GET) \
.uri("/open-apis/drive/v1/files/:file_token/download") \
.paths({"file_token": "TYr6bxn1PoFZb0x63UocDQ5tnBf"}) \
.token_types({lark.AccessTokenType.TENANT}) \
.build()
# 发起请求
response: lark.BaseResponse = client.request(request)
# 处理失败返回
if not response.success():
lark.logger.error(
f"client.request failed, code: {response.code}, msg: {response.msg}, log_id: {response.get_log_id()}")
return
# 处理业务结果
file = io.BytesIO(response.raw.content)
file_name = lark.Files.parse_file_name(response.raw.headers)
f = open(f"/Users/bytedance/Desktop/{file_name}", "wb")
f.write(file.read())
f.close()
if __name__ == "__main__":
pass