-
Notifications
You must be signed in to change notification settings - Fork 0
/
utils.py
92 lines (81 loc) · 3.31 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
#!/usr/bin/env python
# _*_coding:utf-8 _*_
# @Time :2022/7/22 21:40
# @Author :Abner Wong
# @Software: PyCharm
import os
import torch
import numpy as np
import pickle as pkl
from tqdm import tqdm
import time
from datetime import timedelta
MAX_VOCAB_SIZE = 10000
UNK, PAD = '<UNK>', '<PAD>'
def build_vocab(file_path, tokenizer, max_size, min_freq):
vocab_dic = {}
with open(file_path, 'r', encoding='UTF-8') as f:
for line in tqdm(f):
lin = line.strip()
if not lin:
continue
content = lin.split('\t')[0]
for word in tokenizer(content):
vocab_dic[word] = vocab_dic.get(word, 0) + 1
vocab_list = sorted([_ for _ in vocab_dic.items() if _[1] >= min_freq], key=lambda x: x[1], reverse=True)[:max_size]
vocab_dic = {word_count[0]: idx for idx, word_count in enumerate(vocab_list)}
vocab_dic.update({UNK: len(vocab_dic), PAD: len(vocab_dic) + 1})
return vocab_dic
def build_dataset(config, ues_word):
if ues_word:
tokenizer = lambda x: x.split(' ') # 以空格隔开,word-level
else:
tokenizer = lambda x: [y for y in x] # char-level
if os.path.exists(config.vocab_path):
vocab = pkl.load(open(config.vocab_path, 'rb'))
else:
vocab = build_vocab(config.train_path, tokenizer=tokenizer, max_size=MAX_VOCAB_SIZE, min_freq=1)
pkl.dump(vocab, open(config.vocab_path, 'wb'))
print(f"Vocab size: {len(vocab)}")
def biGramHash(sequence, t, buckets):
t1 = sequence[t - 1] if t - 1 >= 0 else 0
return (t1 * 14918087) % buckets
def triGramHash(sequence, t, buckets):
t1 = sequence[t - 1] if t - 1 >= 0 else 0
t2 = sequence[t - 2] if t - 2 >= 0 else 0
return (t2 * 14918087 * 18408749 + t1 * 14918087) % buckets
def load_dataset(path, pad_size=32):
contents = []
with open(path, 'r', encoding='UTF-8') as f:
for line in tqdm(f):
lin = line.strip()
if not lin:
continue
content, label = lin.split('\t')
words_line = []
token = tokenizer(content)
seq_len = len(token)
if pad_size:
if len(token) < pad_size:
token.extend([PAD] * (pad_size - len(token)))
else:
token = token[:pad_size]
seq_len = pad_size
# word to id
for word in token:
words_line.append(vocab.get(word, vocab.get(UNK)))
# fasttext ngram
buckets = config.n_gram_vocab
bigram = []
trigram = []
# ------ngram------
for i in range(pad_size):
bigram.append(biGramHash(words_line, i, buckets))
trigram.append(triGramHash(words_line, i, buckets))
# -----------------
contents.append((words_line, int(label), seq_len, bigram, trigram))
return contents # [([...], 0), ([...], 1), ...]
train = load_dataset(config.train_path, config.pad_size)
dev = load_dataset(config.dev_path, config.pad_size)
test = load_dataset(config.test_path, config.pad_size)
return vocab, train, dev, test