-
Notifications
You must be signed in to change notification settings - Fork 3
/
calibrate
executable file
·333 lines (280 loc) · 10.5 KB
/
calibrate
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
#! /usr/bin/python3
# usage: map measurement-dir output-dir
import os
import sys
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), 'lib')))
import collections
import csv
import datetime
import itertools
import glob
import gzip
import multiprocessing
import pickle
import pickletools
import random
import shlex
import time
import numpy as np
import ageo
from pyproj import Geod
_time_0 = time.monotonic()
def progress(message, *args):
global _time_0
sys.stderr.write(
("{}: " + message + "\n").format(
datetime.timedelta(seconds = time.monotonic() - _time_0),
*args))
def warning(message, *args):
sys.stderr.write(
("\t*** " + message + "\n").format(*args))
_WGS84dist = Geod(ellps='WGS84').inv
def WGS84dist(lon1, lat1, lon2, lat2):
_, _, dist = _WGS84dist(lon1, lat1, lon2, lat2)
return dist
def WGS84daz(lon1, lat1, lon2, lat2):
az, _, dist = _WGS84dist(lon1, lat1, lon2, lat2)
return dist, az
DESIRED_SAMPLES = 200
def update_sample(sample, count, obs):
n = len(sample)
if n < DESIRED_SAMPLES:
sample.append(obs)
return
lo = min(sample)
hi = max(sample)
if obs < lo or obs > hi or random.random() < DESIRED_SAMPLES/(count+1):
lo = min(obs, lo)
hi = max(obs, hi)
replace = random.randint(0, n-1)
while sample[replace] <= lo or sample[replace] >= hi:
replace = random.randint(0, n-1)
sample[replace] = obs
TruePosition = collections.namedtuple("TruePosition",
("lat", "lon", "ipv4", "asn", "cc"))
def load_true_positions(fname, positions, cache_ts):
with open(fname) as fp:
if os.fstat(fp.fileno()).st_mtime < cache_ts:
return False
positions.clear()
rd = csv.DictReader(fp)
if 'id' in rd.fieldnames:
idf = 'id'
elif 'pid' in rd.fieldnames:
idf = 'pid'
else:
raise RuntimeError("{}: can't find id column among {!r}"
.format(fname, rd.fieldnames))
for row in rd:
pos = TruePosition(
float(row['latitude']), float(row['longitude']),
row['address_v4'], row['asn_v4'], row['country_code'].lower())
# sanity check
if not (-90 <= pos.lat < 90) or not (-180 < pos.lon < 180):
warning("{} ({}): position off globe: {}, {}",
row[idf], pos.ipv4, pos.lat, pos.lon)
elif (-1 < pos.lat < 1) and (-1 < pos.lon < 1):
warning("{} ({}): null island: {}, {}",
row[idf], pos.ipv4, pos.lat, pos.lon)
else:
positions[int(row[idf])] = pos
return True
def compute_true_distances(positions):
distances = {}
for did, dpos in positions.items():
distances[did] = {}
for sid, spos in positions.items():
distances[did][sid] = WGS84dist(dpos.lon, dpos.lat,
spos.lon, spos.lat)
return distances
def load_measurements(fname, measurements, counts,
cache_ts, positions, already):
with open(fname) as fp:
if os.fstat(fp.fileno()).st_mtime < cache_ts:
return False
progress("loading {!r}", shlex.quote(fname))
rd = csv.DictReader(fp)
for row in rd:
d_id = int(row['d.id'])
s_id = int(row['s.id'])
rtt = float(row['rtt'])
if not (0 <= rtt < 4000):
warning("out of range: {} -> {}: {}", s_id, d_id, rtt)
continue
if d_id not in positions:
if d_id not in already:
warning("no position for dest {}", d_id)
already.add(d_id)
continue
if s_id not in positions:
if s_id not in already:
warning("no position for src {}", s_id)
already.add(s_id)
continue
if d_id not in measurements:
measurements[d_id] = {}
counts[d_id] = {}
if s_id not in measurements[d_id]:
measurements[d_id][s_id] = []
counts[d_id][s_id] = 0
update_sample(measurements[d_id][s_id], counts[d_id][s_id], rtt)
counts[d_id][s_id] += 1
return True
def load_raw_data(mdir):
positions_f = mdir + '/anchor-index.csv'
pingtimes_g = mdir + '/pingtimes-*.csv'
cache_f = mdir + '/cache.pickle.gz'
try:
progress("Loading measurement cache...")
with gzip.open(cache_f, "rb") as mcache:
positions, distances, measurements, counts = pickle.load(mcache)
cache_ts = os.fstat(mcache.fileno()).st_mtime
cache_dirty = False
except (OSError, EOFError, pickle.UnpicklingError) as e:
warning("Failed to load measurement cache: {}", e)
positions = {}
distances = None
measurements = {}
counts = {}
cache_ts = 0
cache_dirty = True
progress("Loading positions...")
positions_changed = load_true_positions(positions_f, positions, cache_ts)
if positions_changed or distances is None:
cache_dirty = True
distances = compute_true_distances(positions)
progress("Loading measurements...")
already = set()
for f in glob.glob(pingtimes_g):
cache_dirty |= load_measurements(f, measurements, counts,
cache_ts, positions, already)
if cache_dirty:
progress("Updating measurement cache...")
with gzip.open(cache_f, "wb") as mcache:
mcache.write(
pickletools.optimize(
pickle.dumps(
(positions, distances, measurements, counts),
pickle.HIGHEST_PROTOCOL)))
return positions, distances, measurements, cache_ts
def make_calibration_obs(did, obs):
cbg = ageo.calibration.CBG(obs)
progress("{}: CBG", did)
octant = ageo.calibration.QuasiOctant(obs)
progress("{}: Octant", did)
spotter = ageo.calibration.Spotter(obs)
progress("{}: Spotter", did)
return did, obs, cbg, octant, spotter
def make_calibration_1(did, srcs, distances):
progress("Calibrating {}...", did)
obs = np.ndarray(shape=(0,2))
for sid, rts in srcs.items():
if sid == did: continue
tdist = distances[did][sid]
rts = np.array(rts)
o1 = np.column_stack((np.full_like(rts, tdist), rts))
obs = np.vstack((obs, o1))
return make_calibration_obs(did, obs)
def call_make_calibration_1(args):
return make_calibration_1(*args)
def make_calibrations(measurements, distances, pool):
cbg = {}
octant = {}
spotter = {}
obs_all = np.ndarray(shape=(0,2))
progress("Calibrating...")
for did, o1, cbg1, oct1, spo1 in pool.imap_unordered(
call_make_calibration_1,
((d, s, distances) for d, s in measurements.items()),
chunksize=1):
cbg[did] = cbg1
octant[did] = oct1
spotter[did] = spo1
obs_all = np.vstack((obs_all, o1))
progress("Calibrating combined...")
_, _, cbgA, octA, spoA = make_calibration_obs("combined", obs_all)
cbg[0] = cbgA
octant[0] = octA
spotter[0] = spoA
return cbg, octant, spotter
def cached_make_calibrations(measurements, distances, pool, mdir, mcache_ts):
cal_cache_f = mdir + "/calibration.pickle"
try:
progress("Loading calibration cache...")
with gzip.open(cal_cache_f, "rb") as calcache:
cache_ts = os.fstat(calcache.fileno()).st_mtime
if cache_ts > mcache_ts:
calibrations = pickle.load(calcache)
return calibrations
else:
warning("Calibration cache out of date, regenerating.")
except (OSError, EOFError, pickle.UnpicklingError) as e:
warning("Failed to load calibration cache: {}", e)
calibrations = make_calibrations(measurements, distances, pool)
with gzip.open(cal_cache_f, "wb") as calcache:
calcache.write(
pickletools.optimize(
pickle.dumps(calibrations, pickle.HIGHEST_PROTOCOL)))
return calibrations
def crunch_obs_1(odir, positions, distances, basemap,
did, srcs,
tag, cals, ranging, use_all):
bnd = basemap.bounds
obsv = []
for sid, rtts in srcs.items():
if (sid == did
or sid not in cals or sid not in positions
or len(rtts) < 2):
continue
spos = positions[sid]
obs = ageo.Observation(
basemap=basemap,
ref_lat=spos.lat,
ref_lon=spos.lon,
range_fn=ranging,
calibration=cals[0] if use_all else cals[sid],
rtts=rtts)
obsv.append(obs)
bnd = bnd.intersection(obs.bounds)
assert not bnd.is_empty
loc = obsv[0]
for obs in obsv[1:]:
loc = loc.intersection(obs, bnd)
loc = loc.intersection(basemap, bnd)
loc.save(os.path.join(odir, tag + "-" + str(did) + ".h5"))
return tag, did
def call_crunch_obs_1(args):
return crunch_obs_1(*args)
def main():
basemap = ageo.Map(sys.argv[1])
positions, distances, measurements, cache_ts = \
load_raw_data(sys.argv[2])
with multiprocessing.Pool() as pool:
cal_cbg, cal_oct, cal_spo = cached_make_calibrations(
measurements, distances, pool, sys.argv[2], cache_ts)
minmax = ageo.ranging.MinMax
gaussn = ageo.ranging.Gaussian
crunch_modes = [
("cbg-m-1", cal_cbg, minmax, False),
("cbg-m-a", cal_cbg, minmax, True),
("oct-m-1", cal_oct, minmax, False),
("oct-m-a", cal_oct, minmax, True),
("spo-m-1", cal_spo, minmax, False),
("spo-m-a", cal_spo, minmax, True),
("spo-g-1", cal_spo, gaussn, False),
("spo-g-a", cal_spo, gaussn, True)
]
progress("Crunching observations...")
os.makedirs(sys.argv[3], exist_ok=True)
for tag, did in pool.imap_unordered(
call_crunch_obs_1,
((sys.argv[3], positions, distances, basemap,
did, srcs,
tag, cals, ranging, use_all)
for did, srcs in measurements.items()
for tag, cals, ranging, use_all in crunch_modes),
chunksize=1):
# all the actual work has been done in crunch_obs_1, we
# just issue status reports here
progress("{}: {}", did, tag)
main()