forked from wagner-d/TimeSeAD
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcses_forest_X.py
65 lines (49 loc) · 2.04 KB
/
cses_forest_X.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
from torch.utils.data import DataLoader
from timesead.data.cses_dataset_flatX0 import CsesDataset
from timesead.models.baselines.iforest import IForestAD
import matplotlib.pyplot as plot
import os
import numpy
import torch
from precision_stats import Stats
#Modified source files to remove deprecated np.float32
#removed useless imports from __init__ files to allow easy execution
def main():
cses_train_ds = CsesDataset()
cses_train_dl = DataLoader(cses_train_ds)
forest = IForestAD()
forest.model.contamination = 0.4
forest.model.n_jobs = -1
forest.fit(cses_train_dl)
#cses_test_ds = CsesDataset(training=False)
#cses_test_dl = DataLoader(cses_test_ds)
plots_dir = 'Plots_iforest_polar'
if not os.path.exists(plots_dir):
os.makedirs(plots_dir)
i = 0
tot_scores = []
for item, _ in cses_train_dl:
scores = forest.compute_online_anomaly_score(item)
scores = scores.numpy()
tot_scores.append(scores)
plot.figure(figsize=(12, 6))
plot.plot(item[0, :, 0, 0].numpy(), label='E_X_normalized', color='blue', alpha=0.6)
plot.axhline(y=forest.model.threshold_, color='red', linestyle='--', label='Threshold')
anomalies = numpy.where(scores > forest.model.threshold_)[0]
plot.scatter(anomalies, item[0, anomalies, 0, 0].numpy(), color='red', label='Anomalies', s=15)
plot.title('Test')
plot.xlabel('Time')
plot.ylabel('E_X_res')
plot.savefig(os.path.join(plots_dir, f'{i}.png'))
plot.close()
i += 1
tot_scores = numpy.array(tot_scores, dtype=float)
numpy.save("tot_scores_forest_X.npy", tot_scores)
tot_scores = tot_scores.flatten()
anomalies = numpy.load("anomalies.npy")
anomalies = anomalies.flatten()
stats = Stats(tot_scores, anomalies, forest.model.threshold_)
out: numpy.ndarray = {stats.precision(), stats.recall(), stats.accuracy(), stats.F1score()}
print(out)
if __name__ == '__main__':
main()