Skip to content

Commit db082ed

Browse files
authored
Updates to support the CAC data (#14)
* Cleaned up the ingester to work better for CAC data * Added support for ingesting logging intervals * Support for ingesting instrument and status * Added flag ingest method * Fixed bug in the flag ingest process Flags with null notes where not being matched * Added the start of some testing files The test_flags script is not an automated test yet but I thought the data files and process would still be helpful to have committed * Flagging updates and hourly data rollup fixes * Some cleanup and bug fixes Added some methods that help in the testing/dev environments
1 parent b9fab84 commit db082ed

9 files changed

+699
-77
lines changed

ingest/etl_process_measurements.sql

+28-8
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,16 @@ FROM staging_measurements;
5252
-- that duplicate sensors with the same ingest/source id are created
5353
-- this is a short term fix
5454
-- a long term fix would not allow duplicate source_id's
55-
WITH ranked_sensors AS (
55+
WITH staged_sensors AS (
56+
-- this first part signficantly speeds it up on slow machines
57+
SELECT DISTINCT ingest_id
58+
FROM staging_measurements
59+
), ranked_sensors AS (
5660
SELECT s.sensors_id
5761
, s.source_id
5862
, RANK() OVER (PARTITION BY s.source_id ORDER BY added_on ASC) as rnk
5963
FROM sensors s
60-
JOIN staging_measurements m ON (s.source_id = m.ingest_id)
64+
JOIN staged_sensors m ON (s.source_id = m.ingest_id)
6165
), active_sensors AS (
6266
SELECT source_id
6367
, sensors_id
@@ -68,6 +72,7 @@ WITH ranked_sensors AS (
6872
FROM active_sensors s
6973
WHERE s.source_id=ingest_id;
7074

75+
7176
-- Now we have to fill in any missing information
7277
-- first add the nodes and systems that dont exist
7378
-- add just the bare minimum amount of data to the system
@@ -285,6 +290,7 @@ INSERT INTO sensors_rollup (
285290
, value_latest
286291
, value_count
287292
, value_avg
293+
, value_sd
288294
, value_min
289295
, value_max
290296
, geom_latest
@@ -299,6 +305,7 @@ WITH numbered AS (
299305
, sum(1) OVER (PARTITION BY sensors_id) as value_count
300306
, min(datetime) OVER (PARTITION BY sensors_id) as datetime_min
301307
, avg(value) OVER (PARTITION BY sensors_id) as value_avg
308+
, stddev(value) OVER (PARTITION BY sensors_id) as value_sd
302309
, row_number() OVER (PARTITION BY sensors_id ORDER BY datetime DESC) as rn
303310
FROM staging_inserted_measurements
304311
), latest AS (
@@ -308,6 +315,7 @@ WITH numbered AS (
308315
, value
309316
, value_count
310317
, value_avg
318+
, value_sd
311319
, datetime_min
312320
, lat
313321
, lon
@@ -320,6 +328,7 @@ SELECT l.sensors_id
320328
, l.value -- last value
321329
, l.value_count
322330
, l.value_avg
331+
, l.value_sd
323332
, l.value -- min
324333
, l.value -- max
325334
, public.pt3857(lon, lat)
@@ -348,12 +357,23 @@ SET datetime_last = GREATEST(sensors_rollup.datetime_last, EXCLUDED.datetime_las
348357

349358

350359
-- Update the table that will help to track hourly rollups
351-
INSERT INTO hourly_stats (datetime)
352-
SELECT date_trunc('hour', datetime)
353-
FROM staging_inserted_measurements
354-
GROUP BY 1
355-
ON CONFLICT (datetime) DO UPDATE
356-
SET modified_on = now();
360+
-- this is a replacement to the hourly stats table
361+
WITH inserted_hours AS (
362+
-- first we group things, adding an hour to make it time-ending after truncating
363+
SELECT datetime + '1h'::interval as datetime
364+
, utc_offset(datetime + '1h'::interval, tz.tzid) as tz_offset
365+
FROM staging_inserted_measurements m
366+
JOIN sensors s ON (s.sensors_id = m.sensors_id)
367+
JOIN sensor_systems sy ON (s.sensor_systems_id = sy.sensor_systems_id)
368+
JOIN sensor_nodes sn ON (sy.sensor_nodes_id = sn.sensor_nodes_id)
369+
JOIN timezones tz ON (sn.timezones_id = tz.timezones_id)
370+
GROUP BY 1, 2
371+
)
372+
INSERT INTO hourly_data_queue (datetime, tz_offset)
373+
SELECT as_utc_hour(datetime, tz_offset), tz_offset
374+
FROM inserted_hours
375+
ON CONFLICT (datetime, tz_offset) DO UPDATE
376+
SET modified_on = now();
357377

358378

359379
--Update the export queue/logs to export these records

ingest/etl_process_nodes.sql

+92-14
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ WHERE units IN ('µg/m��','��g/m³', 'ug/m3');
3535

3636

3737

38-
-- match the locations to the nodes using the source_name/id combo
38+
-- match the locations to existing nodes using the source_name/id combo
3939
UPDATE staging_sensornodes
4040
SET sensor_nodes_id = s.sensor_nodes_id
4141
, timezones_id = s.timezones_id
@@ -109,7 +109,9 @@ SELECT site_name
109109
, metadata
110110
, source_id
111111
, timezones_id
112-
, get_providers_id(source_name)
112+
-- default to the unknown provider
113+
-- just to make sure we have one set
114+
, COALESCE(get_providers_id(source_name), 1)
113115
, countries_id
114116
FROM staging_sensornodes
115117
WHERE sensor_nodes_id IS NULL
@@ -156,17 +158,20 @@ FROM r;
156158
-- Sensor Systems --
157159
--------------------
158160

161+
159162
-- make sure that we have a system entry for every ingest_id
160163
-- this is to deal with fetchers that do not add these data
161164
INSERT INTO staging_sensorsystems (sensor_nodes_id, ingest_id, fetchlogs_id, metadata)
162165
SELECT sensor_nodes_id
163-
, source_id -- the ingest_id has the source_name in it and we dont need/want that
166+
--, source_id -- the ingest_id has the source_name in it and we dont need/want that
167+
, ingest_id
164168
, fetchlogs_id
165169
, '{"note":"automatically added for sensor node"}'
166170
FROM staging_sensornodes
167-
WHERE is_new
171+
WHERE is_new AND ingest_id NOT IN (SELECT ingest_sensor_nodes_id FROM staging_sensorsystems)
168172
ON CONFLICT (ingest_id) DO UPDATE
169-
SET sensor_nodes_id = EXCLUDED.sensor_nodes_id;
173+
SET sensor_nodes_id = EXCLUDED.sensor_nodes_id
174+
;
170175

171176
-- Now match the sensor nodes to the system
172177
UPDATE staging_sensorsystems
@@ -197,15 +202,18 @@ SELECT COUNT(1) INTO __rejected_systems
197202
FROM r;
198203

199204
-- And finally we add/update the sensor systems
200-
INSERT INTO sensor_systems (sensor_nodes_id, source_id, metadata)
205+
INSERT INTO sensor_systems (sensor_nodes_id, source_id, instruments_id, metadata)
201206
SELECT sensor_nodes_id
202-
, ingest_id
207+
, s.ingest_id
208+
, i.instruments_id
203209
, metadata
204-
FROM staging_sensorsystems
210+
FROM staging_sensorsystems s
211+
LEFT JOIN instruments i ON (s.instrument_ingest_id = i.ingest_id)
205212
WHERE sensor_nodes_id IS NOT NULL
206-
GROUP BY sensor_nodes_id, ingest_id, metadata
213+
GROUP BY sensor_nodes_id, s.ingest_id, instruments_id, metadata
207214
ON CONFLICT (sensor_nodes_id, source_id) DO UPDATE SET
208215
metadata=COALESCE(sensor_systems.metadata, '{}') || COALESCE(EXCLUDED.metadata, '{}')
216+
, instruments_id = EXCLUDED.instruments_id
209217
, modified_on = now();
210218

211219
----------------------------
@@ -266,10 +274,11 @@ AND sensors.source_id = staging_sensors.ingest_id;
266274

267275

268276
UPDATE staging_sensors
269-
SET measurands_id = measurands.measurands_id
270-
from measurands
271-
WHERE staging_sensors.measurand=measurands.measurand
272-
and staging_sensors.units=measurands.units;
277+
SET measurands_id = m.measurands_id
278+
FROM (SELECT measurand, MIN(measurands_id) AS measurands_id FROM measurands GROUP BY measurand) as m
279+
WHERE staging_sensors.measurand=m.measurand
280+
--AND staging_sensors.units=measurands.units
281+
;
273282

274283

275284
WITH r AS (
@@ -290,20 +299,34 @@ INSERT INTO sensors (
290299
source_id
291300
, sensor_systems_id
292301
, measurands_id
302+
, data_logging_period_seconds
303+
, data_averaging_period_seconds
304+
, sensor_statuses_id
293305
, metadata)
294306
SELECT ingest_id
295307
, sensor_systems_id
296308
, measurands_id
309+
, logging_interval_seconds
310+
, averaging_interval_seconds
311+
, COALESCE(ss.sensor_statuses_id, 1)
297312
, metadata
298-
FROM staging_sensors
313+
FROM staging_sensors s
314+
LEFT JOIN sensor_statuses ss ON (ss.short_code = s.status)
299315
WHERE measurands_id is not null
300316
AND sensor_systems_id is not null
301317
GROUP BY ingest_id
302318
, sensor_systems_id
303319
, measurands_id
320+
, logging_interval_seconds
321+
, averaging_interval_seconds
322+
, ss.sensor_statuses_id
304323
, metadata
305324
ON CONFLICT (sensor_systems_id, measurands_id, source_id) DO UPDATE
306325
SET metadata = COALESCE(sensors.metadata, '{}') || COALESCE(EXCLUDED.metadata, '{}')
326+
, data_logging_period_seconds = EXCLUDED.data_logging_period_seconds
327+
, data_averaging_period_seconds = EXCLUDED.data_averaging_period_seconds
328+
, sensor_statuses_id = EXCLUDED.sensor_statuses_id
329+
, modified_on = now()
307330
RETURNING 1)
308331
SELECT COUNT(1) INTO __inserted_sensors
309332
FROM inserts;
@@ -327,6 +350,61 @@ RETURNING 1)
327350
SELECT COUNT(1) INTO __rejected_sensors
328351
FROM r;
329352

353+
354+
-- update the period so that we dont have to keep doing it later
355+
-- we could do this on import as well if we feel this is slowing us down
356+
UPDATE staging_flags
357+
SET period = tstzrange(COALESCE(datetime_from, '-infinity'::timestamptz),COALESCE(datetime_to, 'infinity'::timestamptz), '[]');
358+
359+
-- Now we have to match things
360+
-- get the right node id and sensors id for the flags
361+
UPDATE staging_flags
362+
SET sensors_id = s.sensors_id
363+
, sensor_nodes_id = sy.sensor_nodes_id
364+
FROM sensors s
365+
JOIN sensor_systems sy ON (s.sensor_systems_id = sy.sensor_systems_id)
366+
WHERE staging_flags.sensor_ingest_id = s.source_id;
367+
368+
-- and then get the right flags_id
369+
UPDATE staging_flags
370+
SET flag_types_id = ft.flag_types_id
371+
FROM flag_types ft
372+
WHERE split_part(staging_flags.ingest_id, '::', 1) = ft.ingest_id;
373+
374+
-- now we should look to see if we should be just extending a flag
375+
UPDATE staging_flags sf
376+
SET flags_id = fm.flags_id
377+
FROM flags fm
378+
-- where the core information is the same (exactly)
379+
WHERE sf.sensor_nodes_id = fm.sensor_nodes_id
380+
AND sf.flag_types_id = fm.flag_types_id
381+
AND ((sf.note = fm.note) OR (sf.note IS NULL AND fm.note IS NULL))
382+
-- the periods touch or overlap
383+
AND fm.period && sf.period
384+
-- and the flagged record sensors contains the current sensors
385+
AND fm.sensors_ids @> ARRAY[sf.sensors_id];
386+
387+
-- and finally we will insert the new flags
388+
INSERT INTO flags (flag_types_id, sensor_nodes_id, sensors_ids, period, note)
389+
SELECT flag_types_id
390+
, sensor_nodes_id
391+
, CASE WHEN sensors_id IS NOT NULL THEN ARRAY[sensors_id] ELSE NULL END
392+
, period
393+
, note
394+
FROM staging_flags
395+
WHERE flag_types_id IS NOT NULL
396+
AND sensor_nodes_id IS NOT NULL
397+
AND flags_id IS NULL;
398+
399+
-- And then update any that need to be updated
400+
UPDATE flags fm
401+
SET period = sf.period + fm.period
402+
, note = sf.note
403+
, modified_on = now()
404+
FROM staging_flags sf
405+
WHERE sf.flags_id = fm.flags_id;
406+
407+
330408
------------------
331409
-- Return stats --
332410
------------------

0 commit comments

Comments
 (0)