-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfunc.py
304 lines (268 loc) · 10.1 KB
/
func.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
import requests
from urllib.parse import unquote, quote
import re
import json
import requests
from bs4 import BeautifulSoup
from datetime import datetime
import time
import discord
# ------ 讀取json檔案 ------
def load_file(file_name: str) -> dict:
with open(file_name, 'r', encoding='utf-8') as files:
loaded_file = json.load(files)
return loaded_file
# ------ 儲存json檔案 ------
def save_file(file_name: str, data) -> None:
with open(file_name, 'w', encoding='utf-8') as file:
json.dump(data, file, indent=4, ensure_ascii=False)
# ------ 設定 ------
setting = load_file('settings.json')
SHORT_URL_KEY = setting['key']
URL_ROOT = setting['url_root']
unichr = chr
# ------ 內建縮網址 ------
def short_repl_it_url(url, key):
urla = is_string_an_url(url)
if urla is None or urla == '':
return 'error'
urla = f'{URL_ROOT}shorturl?key={key}&url={url}'
response = requests.post(urla).text
return response
# ------ 函式處理 ------
def shorts_url(url, filename, image=None):
if image is None:
# shrtco_de_url = f'`{filename}` | {shrtco_de(url)}'
shrtco_de_url = f'`{filename}` | {url}'
elif image == 'image':
# shrtco_de_url = shrtco_de(url)
shrtco_de_url = short_repl_it_url(url, SHORT_URL_KEY)
if not shrtco_de_url.endswith('error'):
return shrtco_de_url
else:
print('無法連上api')
return f'`{filename}` | [連結點我]({url})'
# ------ 去除unicode格式 ------
def unquote_unicode(string, _cache={}):
string = unquote(string) # handle two-digit %hh components first
parts = string.split(u'%u')
if len(parts) == 1:
return parts
r = [parts[0]]
append = r.append
for part in parts[1:]:
try:
digits = part[:4].lower()
if len(digits) < 4:
raise ValueError
ch = _cache.get(digits)
if ch is None:
ch = _cache[digits] = unichr(int(digits, 16))
if (not r[-1] and u'\uDC00' <= ch <= u'\uDFFF'
and u'\uD800' <= r[-2] <= u'\uDBFF'):
# UTF-16 surrogate pair, replace with single non-BMP codepoint
r[-2] = (r[-2] + ch).encode('utf-16', 'surrogatepass').decode('utf-16')
else:
append(ch)
append(part[4:])
except ValueError:
append(u'%u')
append(part)
return u''.join(r)
# ------ 判斷字串是否為有效網址 ------
def is_string_an_url(url_string: str) -> bool:
regex = re.compile(
r'^(?:http|ftp)s?://' # http:// or https://
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' #domain...
r'localhost|' #localhost...
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip
r'(?::\d+)?' # optional port
r'(?:/?|[/?]\S+)$',
re.IGNORECASE)
link = re.match(regex, url_string)
if link is None:
return False
else:
return link
# ------ 新增公告點閱數 ------
async def update_news_count(a):
url = f"https://www.hchs.hc.edu.tw/ischool/widget/site_news/update_news_clicks.php?newsId={a}"
try:
# 讀取代理伺服器列表
proxy_url = "https://gimmeproxy.com/api/getProxy?supportsHttps=true&maxCheckPeriod=10000&protocol=http"
response = requests.get(proxy_url)
response_json = response.json()
if response_json == {"error": "no more proxies left"}:
print(response_json)
return 'error'
proxy = f"{response.json()['ip']}:{response.json()['port']}"
proxies = {"http": proxy}
response = requests.get(url, timeout=7, proxies=proxies)
response.raise_for_status()
except requests.exceptions.RequestException as e:
print("error:", e)
return 'error'
# ------ 偵測是否有重疊的公告------
def detect_and_resolve_duplicates():
news = load_file('news.json')
value_counts = {}
resolved_data = {}
for key, value in news.items():
if value != "None":
if value in value_counts:
value_counts[value] += 1
new_value = f"{value}{value_counts[value]}"
resolved_data[key] = new_value
else:
value_counts[value] = 1
resolved_data[key] = value
else:
resolved_data[key] = value
resolved_data = {
str(k): v
for k, v in sorted(
news.items(), key=lambda item: int(item[0]), reverse=True)
}
save_file('news.json', resolved_data)
return
# ------ 將空格轉換為%20 ------
async def encode_spaces_in_url(url):
encoded_url = quote(url, safe=':/')
return encoded_url
# ------ 取得公告 ------
async def get_anc(news_id, auto=None):
try:
news = load_file('news.json')
if auto is None:
# find all value
for key, value in news.items():
if len(value) == 100 and value.startswith(news_id):
news_id = key
elif len(value) < 100 and value == news_id:
news_id = key
url = f"https://www.hchs.hc.edu.tw/ischool/public/news_view/show.php?nid={news_id}"
# GET request
response = requests.get(url)
html = response.text
# bs4 html
soup = BeautifulSoup(html, 'lxml')
# find js
js_code = soup.find_all('script', type='text/javascript')
try:
# regex get value
attached_file_data = re.search(
r'var g_attached_file_json_data = \'(.*?)\'', str(js_code)).group(1)
news_unique_id = re.search(r'var g_news_unique_id = "(.*?)"',
str(js_code)).group(1)
resource_folder = re.search(r'var g_resource_folder = "(.*?)"',
str(js_code)).group(1)
attached_file_json_data = json.loads(attached_file_data)
except:
raise ValueError('公告ID錯誤: 找不到該ID')
attachments = []
for file_data in attached_file_json_data:
file_name = unquote_unicode(file_data[2])
if type(file_name) == list:
file_name = file_name[0]
file_link = await encode_spaces_in_url(f'{URL_ROOT}?id={news_id}&news_unique_id={news_unique_id}&res_folder={resource_folder}&res_name={file_name}')
shorted_url = shorts_url(short_repl_it_url(file_link, SHORT_URL_KEY),
file_name, None)
attachments.append(str(shorted_url))
time.sleep(0.2)
# get title
title_element = soup.find('h4')
title = title_element.text.strip()
if auto is not None:
news[str(news_id)] = title
# get info
info_unit = soup.find(id='info_unit').text.strip()
info_person = soup.find(id='info_person').text.strip()
info_time = soup.find(id='info_time').text.strip()
def html_to_text(content):
try:
table = content.find('table').find('tbody')
if table:
rows = table.find_all('tr')
text_list = []
for row in rows:
cells = row.find_all('td')
row_text = '\t'.join([cell.get_text(strip=True) for cell in cells])
text_list.append(row_text)
return '\n'.join(text_list)
except:
paragraphs = content.find_all('p')
text = '\n'.join([
p.get_text(strip=True) for p in paragraphs[1:]
if p.get_text(strip=True)
])
if text == "\n" or text is None or text == "":
divs = content.find_all('div')
div_text = '\n'.join([
div.get_text(strip=True) for div in divs[1:]
if div.get_text(strip=True)
])
return div_text
return text
content = soup.find('div', id='content')
formatted_text = html_to_text(content)
# regex find all links(without unicode)
pattern = r'(http|ftp|https):\/\/([\w_-]+(?:(?:\.[\w_-]+)+))([\w.,@?^=%&:\/~+#-]*[\w@?^=%&\/~+#-])(?![\/u4e00\-\/u9fa5])'
compiled_pattern = re.compile(pattern, re.MULTILINE | re.ASCII)
links = compiled_pattern.findall(formatted_text)
# format links and add a space after it
formatted_links = [
'{}://{}{} '.format(link[0], link[1], link[2]) for link in links
]
# put formatted links back to formatted_text
for link in formatted_links:
formatted_text = formatted_text.replace(link.strip(), link)
# get all pic's link
image_links = []
img_tags = soup.find_all('img')
for img_tag in img_tags:
if 'src' in img_tag.attrs:
if not '/ischool/resources/WID' in img_tag['src']:
image_links.append(f"{img_tag['src']}+")
else:
image_links.append(img_tag['src'])
if auto is not None:
save_file('news.json', news)
embed = discord.Embed(title="爬蟲結果",
url=url,
description=f'新聞ID: {news_id}',
colour=0x00b0f4,
timestamp=datetime.now())
embed.add_field(name="標題", value=title, inline=False)
embed.add_field(name="單位", value=info_unit, inline=False)
embed.add_field(name="張貼人", value=info_person, inline=False)
embed.add_field(name="張貼日期", value=info_time, inline=False)
if formatted_text != '' and formatted_text != None and formatted_text != "\n":
embed.add_field(name="內容", value=formatted_text[:1024], inline=False)
else:
embed.add_field(name='內容', value='無', inline=False)
# attachments
if attachments:
attachments_formatted = "\n".join(attachment
for attachment in attachments)
embed.add_field(name="附件檔案", value=attachments_formatted, inline=False)
else:
embed.add_field(name="附件檔案", value="無", inline=False)
# pic links
if image_links:
image_links_formatted = "\n".join([
f"`圖片{id}` | {shorts_url(f'{URL_ROOT}images?id={news_id}&name={image_filename}', image_filename, 'image')}"
if not image_filename.endswith('+')
else f"`圖片{id}` | {shorts_url(image_filename.rstrip('+'), '', 'image')}"
for id, image_filename in enumerate(image_links, start=1)
])
embed.add_field(name="圖片", value=image_links_formatted, inline=False)
else:
embed.add_field(name="圖片", value="無", inline=False)
embed.set_footer(
text="黑色麻中",
icon_url=
"https://cdn.discordapp.com/avatars/1146008422144290826/13051e7a68067c42c417f3aa04de2ffa.webp"
)
return embed
except Exception as e:
return f'error{e}'