-
Notifications
You must be signed in to change notification settings - Fork 2.6k
/
base.py
439 lines (379 loc) · 15.7 KB
/
base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.
import abc
import time
import datetime
import importlib
from pathlib import Path
from typing import Type, Iterable
from concurrent.futures import ProcessPoolExecutor
import pandas as pd
from tqdm import tqdm
from loguru import logger
from joblib import Parallel, delayed
from qlib.utils import code_to_fname
class BaseCollector(abc.ABC):
CACHE_FLAG = "CACHED"
NORMAL_FLAG = "NORMAL"
DEFAULT_START_DATETIME_1D = pd.Timestamp("2000-01-01")
DEFAULT_START_DATETIME_1MIN = pd.Timestamp(datetime.datetime.now() - pd.Timedelta(days=5 * 6 - 1)).date()
DEFAULT_END_DATETIME_1D = pd.Timestamp(datetime.datetime.now() + pd.Timedelta(days=1)).date()
DEFAULT_END_DATETIME_1MIN = DEFAULT_END_DATETIME_1D
INTERVAL_1min = "1min"
INTERVAL_1d = "1d"
def __init__(
self,
save_dir: [str, Path],
start=None,
end=None,
interval="1d",
max_workers=1,
max_collector_count=2,
delay=0,
check_data_length: int = None,
limit_nums: int = None,
):
"""
Parameters
----------
save_dir: str
instrument save dir
max_workers: int
workers, default 1; Concurrent number, default is 1; when collecting data, it is recommended that max_workers be set to 1
max_collector_count: int
default 2
delay: float
time.sleep(delay), default 0
interval: str
freq, value from [1min, 1d], default 1d
start: str
start datetime, default None
end: str
end datetime, default None
check_data_length: int
check data length, if not None and greater than 0, each symbol will be considered complete if its data length is greater than or equal to this value, otherwise it will be fetched again, the maximum number of fetches being (max_collector_count). By default None.
limit_nums: int
using for debug, by default None
"""
self.save_dir = Path(save_dir).expanduser().resolve()
self.save_dir.mkdir(parents=True, exist_ok=True)
self.delay = delay
self.max_workers = max_workers
self.max_collector_count = max_collector_count
self.mini_symbol_map = {}
self.interval = interval
self.check_data_length = max(int(check_data_length) if check_data_length is not None else 0, 0)
self.start_datetime = self.normalize_start_datetime(start)
self.end_datetime = self.normalize_end_datetime(end)
self.instrument_list = sorted(set(self.get_instrument_list()))
if limit_nums is not None:
try:
self.instrument_list = self.instrument_list[: int(limit_nums)]
except Exception as e:
logger.warning(f"Cannot use limit_nums={limit_nums}, the parameter will be ignored")
def normalize_start_datetime(self, start_datetime: [str, pd.Timestamp] = None):
return (
pd.Timestamp(str(start_datetime))
if start_datetime
else getattr(self, f"DEFAULT_START_DATETIME_{self.interval.upper()}")
)
def normalize_end_datetime(self, end_datetime: [str, pd.Timestamp] = None):
return (
pd.Timestamp(str(end_datetime))
if end_datetime
else getattr(self, f"DEFAULT_END_DATETIME_{self.interval.upper()}")
)
@abc.abstractmethod
def get_instrument_list(self):
raise NotImplementedError("rewrite get_instrument_list")
@abc.abstractmethod
def normalize_symbol(self, symbol: str):
"""normalize symbol"""
raise NotImplementedError("rewrite normalize_symbol")
@abc.abstractmethod
def get_data(
self, symbol: str, interval: str, start_datetime: pd.Timestamp, end_datetime: pd.Timestamp
) -> pd.DataFrame:
"""get data with symbol
Parameters
----------
symbol: str
interval: str
value from [1min, 1d]
start_datetime: pd.Timestamp
end_datetime: pd.Timestamp
Returns
---------
pd.DataFrame, "symbol" and "date"in pd.columns
"""
raise NotImplementedError("rewrite get_timezone")
def sleep(self):
time.sleep(self.delay)
def _simple_collector(self, symbol: str):
"""
Parameters
----------
symbol: str
"""
self.sleep()
df = self.get_data(symbol, self.interval, self.start_datetime, self.end_datetime)
_result = self.NORMAL_FLAG
if self.check_data_length > 0:
_result = self.cache_small_data(symbol, df)
if _result == self.NORMAL_FLAG:
self.save_instrument(symbol, df)
return _result
def save_instrument(self, symbol, df: pd.DataFrame):
"""save instrument data to file
Parameters
----------
symbol: str
instrument code
df : pd.DataFrame
df.columns must contain "symbol" and "datetime"
"""
if df is None or df.empty:
logger.warning(f"{symbol} is empty")
return
symbol = self.normalize_symbol(symbol)
symbol = code_to_fname(symbol)
instrument_path = self.save_dir.joinpath(f"{symbol}.csv")
df["symbol"] = symbol
if instrument_path.exists():
_old_df = pd.read_csv(instrument_path)
df = pd.concat([_old_df, df], sort=False)
df.to_csv(instrument_path, index=False)
def cache_small_data(self, symbol, df):
if len(df) < self.check_data_length:
logger.warning(f"the number of trading days of {symbol} is less than {self.check_data_length}!")
_temp = self.mini_symbol_map.setdefault(symbol, [])
_temp.append(df.copy())
return self.CACHE_FLAG
else:
if symbol in self.mini_symbol_map:
self.mini_symbol_map.pop(symbol)
return self.NORMAL_FLAG
def _collector(self, instrument_list):
error_symbol = []
res = Parallel(n_jobs=self.max_workers)(
delayed(self._simple_collector)(_inst) for _inst in tqdm(instrument_list)
)
for _symbol, _result in zip(instrument_list, res):
if _result != self.NORMAL_FLAG:
error_symbol.append(_symbol)
print(error_symbol)
logger.info(f"error symbol nums: {len(error_symbol)}")
logger.info(f"current get symbol nums: {len(instrument_list)}")
error_symbol.extend(self.mini_symbol_map.keys())
return sorted(set(error_symbol))
def collector_data(self):
"""collector data"""
logger.info("start collector data......")
instrument_list = self.instrument_list
for i in range(self.max_collector_count):
if not instrument_list:
break
logger.info(f"getting data: {i+1}")
instrument_list = self._collector(instrument_list)
logger.info(f"{i+1} finish.")
for _symbol, _df_list in self.mini_symbol_map.items():
_df = pd.concat(_df_list, sort=False)
if not _df.empty:
self.save_instrument(_symbol, _df.drop_duplicates(["date"]).sort_values(["date"]))
if self.mini_symbol_map:
logger.warning(f"less than {self.check_data_length} instrument list: {list(self.mini_symbol_map.keys())}")
logger.info(f"total {len(self.instrument_list)}, error: {len(set(instrument_list))}")
class BaseNormalize(abc.ABC):
def __init__(self, date_field_name: str = "date", symbol_field_name: str = "symbol", **kwargs):
"""
Parameters
----------
date_field_name: str
date field name, default is date
symbol_field_name: str
symbol field name, default is symbol
"""
self._date_field_name = date_field_name
self._symbol_field_name = symbol_field_name
self.kwargs = kwargs
self._calendar_list = self._get_calendar_list()
@abc.abstractmethod
def normalize(self, df: pd.DataFrame) -> pd.DataFrame:
# normalize
raise NotImplementedError("")
@abc.abstractmethod
def _get_calendar_list(self) -> Iterable[pd.Timestamp]:
"""Get benchmark calendar"""
raise NotImplementedError("")
class Normalize:
def __init__(
self,
source_dir: [str, Path],
target_dir: [str, Path],
normalize_class: Type[BaseNormalize],
max_workers: int = 16,
date_field_name: str = "date",
symbol_field_name: str = "symbol",
**kwargs,
):
"""
Parameters
----------
source_dir: str or Path
The directory where the raw data collected from the Internet is saved
target_dir: str or Path
Directory for normalize data
normalize_class: Type[YahooNormalize]
normalize class
max_workers: int
Concurrent number, default is 16
date_field_name: str
date field name, default is date
symbol_field_name: str
symbol field name, default is symbol
"""
if not (source_dir and target_dir):
raise ValueError("source_dir and target_dir cannot be None")
self._source_dir = Path(source_dir).expanduser()
self._target_dir = Path(target_dir).expanduser()
self._target_dir.mkdir(parents=True, exist_ok=True)
self._date_field_name = date_field_name
self._symbol_field_name = symbol_field_name
self._end_date = kwargs.get("end_date", None)
self._max_workers = max_workers
self._normalize_obj = normalize_class(
date_field_name=date_field_name, symbol_field_name=symbol_field_name, **kwargs
)
def _executor(self, file_path: Path):
file_path = Path(file_path)
# some symbol_field values such as TRUE, NA are decoded as True(bool), NaN(np.float) by pandas default csv parsing.
# manually defines dtype and na_values of the symbol_field.
default_na = pd._libs.parsers.STR_NA_VALUES # pylint: disable=I1101
symbol_na = default_na.copy()
symbol_na.remove("NA")
columns = pd.read_csv(file_path, nrows=0).columns
df = pd.read_csv(
file_path,
dtype={self._symbol_field_name: str},
keep_default_na=False,
na_values={col: symbol_na if col == self._symbol_field_name else default_na for col in columns},
)
# NOTE: It has been reported that there may be some problems here, and the specific issues will be dealt with when they are identified.
df = self._normalize_obj.normalize(df)
if df is not None and not df.empty:
if self._end_date is not None:
_mask = pd.to_datetime(df[self._date_field_name]) <= pd.Timestamp(self._end_date)
df = df[_mask]
df.to_csv(self._target_dir.joinpath(file_path.name), index=False)
def normalize(self):
logger.info("normalize data......")
with ProcessPoolExecutor(max_workers=self._max_workers) as worker:
file_list = list(self._source_dir.glob("*.csv"))
with tqdm(total=len(file_list)) as p_bar:
for _ in worker.map(self._executor, file_list):
p_bar.update()
class BaseRun(abc.ABC):
def __init__(self, source_dir=None, normalize_dir=None, max_workers=1, interval="1d"):
"""
Parameters
----------
source_dir: str
The directory where the raw data collected from the Internet is saved, default "Path(__file__).parent/source"
normalize_dir: str
Directory for normalize data, default "Path(__file__).parent/normalize"
max_workers: int
Concurrent number, default is 1; Concurrent number, default is 1; when collecting data, it is recommended that max_workers be set to 1
interval: str
freq, value from [1min, 1d], default 1d
"""
if source_dir is None:
source_dir = Path(self.default_base_dir).joinpath("source")
self.source_dir = Path(source_dir).expanduser().resolve()
self.source_dir.mkdir(parents=True, exist_ok=True)
if normalize_dir is None:
normalize_dir = Path(self.default_base_dir).joinpath("normalize")
self.normalize_dir = Path(normalize_dir).expanduser().resolve()
self.normalize_dir.mkdir(parents=True, exist_ok=True)
self._cur_module = importlib.import_module("collector")
self.max_workers = max_workers
self.interval = interval
@property
@abc.abstractmethod
def collector_class_name(self):
raise NotImplementedError("rewrite collector_class_name")
@property
@abc.abstractmethod
def normalize_class_name(self):
raise NotImplementedError("rewrite normalize_class_name")
@property
@abc.abstractmethod
def default_base_dir(self) -> [Path, str]:
raise NotImplementedError("rewrite default_base_dir")
def download_data(
self,
max_collector_count=2,
delay=0,
start=None,
end=None,
check_data_length: int = None,
limit_nums=None,
**kwargs,
):
"""download data from Internet
Parameters
----------
max_collector_count: int
default 2
delay: float
time.sleep(delay), default 0
start: str
start datetime, default "2000-01-01"
end: str
end datetime, default ``pd.Timestamp(datetime.datetime.now() + pd.Timedelta(days=1))``
check_data_length: int
check data length, if not None and greater than 0, each symbol will be considered complete if its data length is greater than or equal to this value, otherwise it will be fetched again, the maximum number of fetches being (max_collector_count). By default None.
limit_nums: int
using for debug, by default None
Examples
---------
# get daily data
$ python collector.py download_data --source_dir ~/.qlib/instrument_data/source --region CN --start 2020-11-01 --end 2020-11-10 --delay 0.1 --interval 1d
# get 1m data
$ python collector.py download_data --source_dir ~/.qlib/instrument_data/source --region CN --start 2020-11-01 --end 2020-11-10 --delay 0.1 --interval 1m
"""
_class = getattr(self._cur_module, self.collector_class_name) # type: Type[BaseCollector]
_class(
self.source_dir,
max_workers=self.max_workers,
max_collector_count=max_collector_count,
delay=delay,
start=start,
end=end,
interval=self.interval,
check_data_length=check_data_length,
limit_nums=limit_nums,
**kwargs,
).collector_data()
def normalize_data(self, date_field_name: str = "date", symbol_field_name: str = "symbol", **kwargs):
"""normalize data
Parameters
----------
date_field_name: str
date field name, default date
symbol_field_name: str
symbol field name, default symbol
Examples
---------
$ python collector.py normalize_data --source_dir ~/.qlib/instrument_data/source --normalize_dir ~/.qlib/instrument_data/normalize --region CN --interval 1d
"""
_class = getattr(self._cur_module, self.normalize_class_name)
yc = Normalize(
source_dir=self.source_dir,
target_dir=self.normalize_dir,
normalize_class=_class,
max_workers=self.max_workers,
date_field_name=date_field_name,
symbol_field_name=symbol_field_name,
**kwargs,
)
yc.normalize()