diff --git a/src/WebSDL/settings/base.py b/src/WebSDL/settings/base.py index 8720e93e..7e9acf5d 100644 --- a/src/WebSDL/settings/base.py +++ b/src/WebSDL/settings/base.py @@ -75,8 +75,11 @@ 'django.middleware.clickjacking.XFrameOptionsMiddleware', 'hydroshare_util.middleware.AuthMiddleware', # 'debug_toolbar.middleware.DebugToolbarMiddleware', + 'django_cprofile_middleware.middleware.ProfilerMiddleware', ] +DJANGO_CPROFILE_MIDDLEWARE_REQUIRE_STAFF = False + REST_FRAMEWORK = { 'DEFAULT_RENDERER_CLASSES': ( 'rest_framework.renderers.JSONRenderer', @@ -120,6 +123,7 @@ 'HOST': database['host'] if 'host' in database else '', 'PORT': database['port'] if 'port' in database else '', 'OPTIONS': database['options'] if 'options' in database else {}, + 'CONN_MAX_AGE': 0, 'TEST': database['test'] if 'test' in database else {}, } diff --git a/src/dataloaderinterface/ajax.py b/src/dataloaderinterface/ajax.py index 77ed2740..947c637b 100644 --- a/src/dataloaderinterface/ajax.py +++ b/src/dataloaderinterface/ajax.py @@ -13,7 +13,7 @@ _dbsettings = settings.DATABASES['odm2'] _connection_str = f"postgresql://{_dbsettings['USER']}:{_dbsettings['PASSWORD']}@{_dbsettings['HOST']}:{_dbsettings['PORT']}/{_dbsettings['NAME']}" -_db_engine = sqlalchemy.create_engine(_connection_str) +_db_engine = sqlalchemy.create_engine(_connection_str, pool_size=30) def get_result_timeseries_recent(request_data:Dict[str,Any]) -> str: result_id = int(request_data['resultid']) diff --git a/src/dataloaderinterface/models.py b/src/dataloaderinterface/models.py index bb26006d..47da30b9 100644 --- a/src/dataloaderinterface/models.py +++ b/src/dataloaderinterface/models.py @@ -13,7 +13,6 @@ Unit, Medium, Organization from dataloaderinterface.querysets import SiteRegistrationQuerySet, SensorOutputQuerySet - class SiteRegistration(models.Model): registration_id = models.AutoField(primary_key=True, db_column='RegistrationID') registration_token = models.CharField(max_length=64, editable=False, db_column='RegistrationToken', unique=True, default=uuid4) @@ -60,7 +59,8 @@ def latest_measurement(self): return try: last_updated_sensor = [sensor for sensor in self.sensors.all() if sensor.last_measurement.pk == self.latest_measurement_id].pop() - except IndexError: + #except IndexError: + except: return None return last_updated_sensor.last_measurement diff --git a/src/dataloaderinterface/signals.py b/src/dataloaderinterface/signals.py index 158721ff..ffa006ec 100644 --- a/src/dataloaderinterface/signals.py +++ b/src/dataloaderinterface/signals.py @@ -7,7 +7,9 @@ from dataloader.models import SamplingFeature, Site, Annotation, SamplingFeatureAnnotation, SpatialReference, Action, \ Method, Result, ProcessingLevel, TimeSeriesResult, Unit from dataloaderinterface.models import SiteRegistration, SiteSensor -from tsa.helpers import TimeSeriesAnalystHelper + +#PRT - deprecated +#from tsa.helpers import TimeSeriesAnalystHelper @receiver(pre_save, sender=SiteRegistration) @@ -78,12 +80,13 @@ def handle_site_registration_post_save(sender, instance, created, update_fields= sampling_feature.annotations.filter(annotation_code='closest_town').update(annotation_text=instance.closest_town or '') -@receiver(post_save, sender=SiteRegistration) -def handle_site_registration_tsa_post_save(sender, instance, created, update_fields=None, **kwargs): - if created: - return - helper = TimeSeriesAnalystHelper() - helper.update_series_from_site(instance) +#PRT - deprecated +#@receiver(post_save, sender=SiteRegistration) +#def handle_site_registration_tsa_post_save(sender, instance, created, update_fields=None, **kwargs): +# if created: +# return +# helper = TimeSeriesAnalystHelper() +# helper.update_series_from_site(instance) @receiver(post_delete, sender=SiteRegistration) @@ -144,13 +147,14 @@ def handle_sensor_post_save(sender, instance, created, update_fields=None, **kwa TimeSeriesResult.objects.filter(result_id=instance.result_id).update(z_location=instance.height) -@receiver(post_save, sender=SiteSensor) -def handle_sensor_tsa_post_save(sender, instance, created, update_fields=None, **kwargs): - helper = TimeSeriesAnalystHelper() - if created: - helper.create_series_from_sensor(instance) - else: - helper.update_series_from_sensor(instance) +#PRT - deprecated +#@receiver(post_save, sender=SiteSensor) +#def handle_sensor_tsa_post_save(sender, instance, created, update_fields=None, **kwargs): +# helper = TimeSeriesAnalystHelper() +# if created: +# helper.create_series_from_sensor(instance) +# else: +# helper.update_series_from_sensor(instance) @receiver(post_delete, sender=SiteSensor) @@ -159,7 +163,8 @@ def handle_sensor_post_delete(sender, instance, **kwargs): result and result.feature_action.action.delete() -@receiver(post_delete, sender=SiteSensor) -def handle_sensor_tsa_post_delete(sender, instance, **kwargs): - helper = TimeSeriesAnalystHelper() - helper.delete_series_for_sensor(instance) +#PRT - deprecated +#@receiver(post_delete, sender=SiteSensor) +#def handle_sensor_tsa_post_delete(sender, instance, **kwargs): +# helper = TimeSeriesAnalystHelper() +# helper.delete_series_for_sensor(instance) diff --git a/src/dataloaderinterface/static/timeseries_visualization/js/visualization.js b/src/dataloaderinterface/static/timeseries_visualization/js/visualization.js index 2270aa42..3475a6b9 100644 --- a/src/dataloaderinterface/static/timeseries_visualization/js/visualization.js +++ b/src/dataloaderinterface/static/timeseries_visualization/js/visualization.js @@ -108,7 +108,7 @@ function updatePlotDateRange(min, max) { } if (max != null) { $('#dpd2').val(dateToString(max)); - max = max.getTime(); + max = max.getTime() + 86399; //select end of day as end point } _chart.xAxis[0].update({'min':min, 'max':max}); _chart.xAxis[0].setExtremes(); diff --git a/src/dataloaderinterface/templates/dataloaderinterface/site_details.html b/src/dataloaderinterface/templates/dataloaderinterface/site_details.html index d80c713f..35ead80c 100644 --- a/src/dataloaderinterface/templates/dataloaderinterface/site_details.html +++ b/src/dataloaderinterface/templates/dataloaderinterface/site_details.html @@ -316,7 +316,7 @@
show_chart
- +

View data for this site.

diff --git a/src/dataloaderinterface/templatetags/helpers.py b/src/dataloaderinterface/templatetags/helpers.py index 6584a4e7..33468d19 100644 --- a/src/dataloaderinterface/templatetags/helpers.py +++ b/src/dataloaderinterface/templatetags/helpers.py @@ -54,7 +54,7 @@ def is_stale(value, default): def divide(value, arg): try: return int(value) / int(arg) if int(arg) != 0 else 0 - except (ValueError, ZeroDivisionError): + except (TypeError, ValueError, ZeroDivisionError): return None diff --git a/src/dataloaderservices/views.py b/src/dataloaderservices/views.py index 62e7e66d..7ab1ba7d 100644 --- a/src/dataloaderservices/views.py +++ b/src/dataloaderservices/views.py @@ -2,6 +2,7 @@ import os from collections import OrderedDict from datetime import time, timedelta, datetime +from typing import Union, Dict, Any, final from io import StringIO from django.utils import encoding @@ -17,7 +18,7 @@ from django.shortcuts import reverse from rest_framework.generics import GenericAPIView -from dataloader.models import SamplingFeature, TimeSeriesResultValue, Unit, EquipmentModel, TimeSeriesResult, Result +from dataloader.models import ProfileResultValue, SamplingFeature, TimeSeriesResultValue, Unit, EquipmentModel, TimeSeriesResult, Result from django.db.models.expressions import F from django.utils.dateparse import parse_datetime from rest_framework import exceptions @@ -41,14 +42,23 @@ #PRT - temporary work around after replacing InfluxDB but not replacement models import sqlalchemy from sqlalchemy.sql import text +import psycopg2 from django.conf import settings + _dbsettings = settings.DATABASES['odm2'] _connection_str = f"postgresql://{_dbsettings['USER']}:{_dbsettings['PASSWORD']}@{_dbsettings['HOST']}:{_dbsettings['PORT']}/{_dbsettings['NAME']}" -_db_engine = sqlalchemy.create_engine(_connection_str) +_db_engine = sqlalchemy.create_engine(_connection_str, pool_size=10) + +_dbsettings_loader = settings.DATABASES['default'] +_connection_str_loader = f"postgresql://{_dbsettings_loader['USER']}:{_dbsettings_loader['PASSWORD']}@{_dbsettings_loader['HOST']}:{_dbsettings_loader['PORT']}/{_dbsettings_loader['NAME']}" +_db_engine_loader = sqlalchemy.create_engine(_connection_str_loader, pool_size=10) + # TODO: Check user permissions to edit, add, or remove stuff with a permissions class. # TODO: Use generic api views for create, edit, delete, and list. +from concurrent.futures import ThreadPoolExecutor, as_completed + class ModelVariablesApi(APIView): authentication_classes = (SessionAuthentication, ) @@ -289,6 +299,7 @@ def post(self, request, *args, **kwargs): data_value = row[results_mapping['results'][uuid]['index']] result_value = TimeseriesResultValueTechDebt( result_id=sensor.result_id, + result_uuid=uuid, data_value=data_value, utc_offset=results_mapping['utc_offset'], value_datetime=measurement_datetime, @@ -298,7 +309,7 @@ def post(self, request, *args, **kwargs): time_aggregation_interval_unit=data_value_units.unit_id, ) try: - result = InsertTimeseriesResultValues(result_value) + result = insert_timeseries_result_values(result_value) except Exception as e: warnings.append(f"Error inserting value '{data_value}'"\ f"at datetime '{measurement_datetime}' for result uuid '{uuid}'") @@ -575,7 +586,6 @@ class TimeSeriesValuesApi(APIView): def post(self, request, format=None): if not all(key in request.data for key in ('timestamp', 'sampling_feature')): raise exceptions.ParseError("Required data not found in request.") - try: measurement_datetime = parse_datetime(request.data['timestamp']) except ValueError: @@ -590,78 +600,70 @@ def post(self, request, format=None): sampling_feature = SamplingFeature.objects.filter(sampling_feature_uuid__exact=request.data['sampling_feature']).first() if not sampling_feature: raise exceptions.ParseError('Sampling Feature code does not match any existing site.') - feature_actions = sampling_feature.feature_actions.prefetch_related('results__variable', 'action').all() - errors = [] - for feature_action in feature_actions: - result = feature_action.results.all().first() - if str(result.result_uuid) not in request.data: - continue + + result_uuids = get_result_UUIDs(sampling_feature.sampling_feature_id) + if not result_uuids: + raise exceptions.ParseError(f"No results_uuids matched to sampling_feature '{request.data['sampling_feature']}'") - result_value = TimeseriesResultValueTechDebt( - result_id=result.result_id, - data_value=request.data[str(result.result_uuid)], - value_datetime=measurement_datetime, - utc_offset=utc_offset, - censor_code='Not censored', - quality_code='None', - time_aggregation_interval=1, - time_aggregation_interval_unit=(Unit.objects.get(unit_name='hour minute')).unit_id) - - try: - query_result = InsertTimeseriesResultValues(result_value) - except Exception as e: - errors.append(f"Failed to INSERT data for uuid('{result.result_uuid}')") - - # PRT - long term we would like to remove dataloader database but for now - # this block of code keeps dataloaderinterface_sensormeasurement table in sync - result.value_count = F('value_count') + 1 - result.result_datetime = measurement_datetime - result.result_datetime_utc_offset = utc_offset - site_sensor = SiteSensor.objects.filter(result_id=result.result_id).first() - last_measurement = SensorMeasurement.objects.filter(sensor=site_sensor).first() - if not last_measurement: - SensorMeasurement.objects.create( - sensor=site_sensor, - value_datetime=result_value.value_datetime, - value_datetime_utc_offset=timedelta(hours=result_value.utc_offset), - data_value=result_value.data_value - ) - elif last_measurement and result_value.value_datetime > last_measurement.value_datetime: - last_measurement and last_measurement.delete() - SensorMeasurement.objects.create( - sensor=site_sensor, - value_datetime=result_value.value_datetime, - value_datetime_utc_offset=timedelta(hours=result_value.utc_offset), - data_value=result_value.data_value - ) + #dataloader table related + try: + set_deployment_date(sampling_feature.sampling_feature_id, measurement_datetime) + except Exception as e: + pass - if result.value_count == 0: - result.valid_datetime = measurement_datetime - result.valid_datetime_utc_offset = utc_offset - - if not site_sensor.registration.deployment_date: - site_sensor.registration.deployment_date = measurement_datetime - #site_sensor.registration.deployment_date_utc_offset = utc_offset - site_sensor.registration.save(update_fields=['deployment_date']) - - try: - result.save(update_fields=[ - 'result_datetime', 'value_count', 'result_datetime_utc_offset', - 'valid_datetime', 'valid_datetime_utc_offset' - ]) - except Exception as e: - #PRT - An exception here means the dataloaderinterface data tables will not in sync - # for this sensor, but that is better than a fail state where data is lost so pass - # expection for now. Long term plan is to remove this whole block of code. - pass - # End dataloaderinterface_sensormeasurement sync block + futures = {} + unit_id = Unit.objects.get(unit_name='hour minute').unit_id + + with ThreadPoolExecutor(max_workers=8) as executor: + for key in request.data: + try: + result_id = result_uuids[key] + except KeyError: + continue + + result_value = TimeseriesResultValueTechDebt( + result_id=result_id, + result_uuid=key, + data_value=request.data[str(key)], + value_datetime=measurement_datetime, + utc_offset=utc_offset, + censor_code='Not censored', + quality_code='None', + time_aggregation_interval=1, + time_aggregation_interval_unit=unit_id) + futures[executor.submit(process_result_value, result_value)] = None + + errors = [] + for future in as_completed(futures): + if future.result() is not None: errors.append(future.result()) + if errors: return Response(errors, status=status.HTTP_500_INTERNAL_SERVER_ERROR) - return Response({}, status.HTTP_201_CREATED) +####################################################### +### Temporary HOT fix to address model performance +####################################################### +#PRT - the code in this block is meant as a hot fix to address poor model performance +#the long term goal is to refactor the application models to make them more performant. + +def get_result_UUIDs(sampling_feature_id:str) -> Union[Dict[str, str],None]: + try: + with _db_engine.connect() as connection: + query = text("SELECT r.resultid, r.resultuuid FROM odm2.results AS r " \ + "JOIN odm2.featureactions AS fa ON r.featureactionid = fa.featureactionid "\ + "WHERE fa.samplingfeatureid = ':sampling_feature_id';") + df = pd.read_sql(query, connection, params={'sampling_feature_id': sampling_feature_id}) + df['resultuuid'] = df['resultuuid'].astype(str) + df = df.set_index('resultuuid') + results = df['resultid'].to_dict() + return results + except: + return None + class TimeseriesResultValueTechDebt(): def __init__(self, - result_id:str, + result_id:str, + result_uuid:str, data_value:float, value_datetime:datetime, utc_offset:int, @@ -670,6 +672,7 @@ def __init__(self, time_aggregation_interval:int, time_aggregation_interval_unit:int) -> None: self.result_id = result_id + self.result_uuid = result_uuid self.data_value = data_value self.utc_offset = utc_offset self.value_datetime = value_datetime @@ -678,29 +681,120 @@ def __init__(self, self.time_aggregation_interval = time_aggregation_interval self.time_aggregation_interval_unit = time_aggregation_interval_unit -def InsertTimeseriesResultValues(result_value : TimeseriesResultValueTechDebt) -> None: +def process_result_value(result_value:TimeseriesResultValueTechDebt) -> Union[str,None]: + result = insert_timeseries_result_values(result_value) + if result is not None: + return result + # PRT - long term we would like to remove dataloader database but for now + # this block of code keeps dataloaderinterface_sensormeasurement table in sync + try: + query_result = sync_dataloader_tables(result_value) + query_result = sync_result_table(result_value) + return None + except Exception as e: + return None + +#dataloader utility function +def get_site_sensor(resultid:str) -> Union[Dict[str, Any],None]: + with _db_engine_loader.connect() as connection: + query = text('SELECT * FROM dataloaderinterface_sitesensor ' \ + 'WHERE "ResultID"=:resultid;' + ) + df = pd.read_sql(query, connection, params={'resultid':resultid}) + return df.to_dict(orient='records')[0] + +#dataloader utility function +def update_sensormeasurement(sensor_id:str, result_value:TimeseriesResultValueTechDebt) -> None: + with _db_engine_loader.connect() as connection: + query = text('UPDATE dataloaderinterface_sensormeasurement ' \ + "SET value_datetime=:datetime, " \ + "value_datetime_utc_offset = :utc_offset, " \ + 'data_value = :data_value ' \ + 'WHERE sensor_id=:sensor_id; ') + result = connection.execute(query, + sensor_id=sensor_id, + datetime=result_value.value_datetime, + utc_offset=timedelta(hours=result_value.utc_offset), + data_value=result_value.data_value + ) + if result.rowcount < 1: + query = text('INSERT INTO dataloaderinterface_sensormeasurement ' \ + "VALUES (:sensor_id,:datetime,':utc_offset',:data_value); ") + result = connection.execute(query, + sensor_id=sensor_id, + datetime=result_value.value_datetime, + utc_offset=timedelta(hours=result_value.utc_offset), + data_value=result_value.data_value + ) + return result + +#dataloader utility function +def sync_dataloader_tables(result_value: TimeseriesResultValueTechDebt) -> None: + site_sensor = get_site_sensor(result_value.result_id) + if not site_sensor: return None + result = update_sensormeasurement(site_sensor['id'], result_value) + return None + +#dataloader utility function +def set_deployment_date(sample_feature_id:int, date_time:datetime) -> None: + with _db_engine_loader.connect() as connection: + query = text('UPDATE dataloaderinterface_siteregistration '\ + 'SET "DeploymentDate"=:date_time '\ + 'WHERE "DeploymentDate" IS NULL AND ' \ + '"SamplingFeatureID"=:sample_feature_id' ) + result = connection.execute(query, + sample_feature_id=sample_feature_id, + date_time=date_time + ) + return None + + +def sync_result_table(result_value: TimeseriesResultValueTechDebt) -> None: with _db_engine.connect() as connection: - query = text("INSERT INTO odm2.timeseriesresultvalues " \ - "(valueid, resultid, datavalue, valuedatetime, valuedatetimeutcoffset, " \ - "censorcodecv, qualitycodecv, timeaggregationinterval, timeaggregationintervalunitsid) " \ - "VALUES ( " \ - "(SELECT nextval('odm2.\"timeseriesresultvalues_valueid_seq\"'))," \ - ":result_id, " \ - ":data_value, " \ - ":value_datetime, " \ - ":utc_offset, " \ - ":censor_code, " \ - ":quality_code, " \ - ":time_aggregation_interval, " \ - ":time_aggregation_interval_unit);") + query = text("UPDATE odm2.results SET valuecount = valuecount + 1, " \ + "resultdatetime = GREATEST(:result_datetime, resultdatetime)" \ + "WHERE resultid=:result_id; ") result = connection.execute(query, result_id=result_value.result_id, - data_value=result_value.data_value, - value_datetime=result_value.value_datetime, - utc_offset=result_value.utc_offset, - censor_code=result_value.censor_code, - quality_code=result_value.quality_code, - time_aggregation_interval=result_value.time_aggregation_interval, - time_aggregation_interval_unit=result_value.time_aggregation_interval_unit, + result_datetime=result_value.value_datetime, + ) + return result + +def insert_timeseries_result_values(result_value : TimeseriesResultValueTechDebt) -> None: + try: + with _db_engine.connect() as connection: + query = text("INSERT INTO odm2.timeseriesresultvalues " \ + "(valueid, resultid, datavalue, valuedatetime, valuedatetimeutcoffset, " \ + "censorcodecv, qualitycodecv, timeaggregationinterval, timeaggregationintervalunitsid) " \ + "VALUES ( " \ + "(SELECT nextval('odm2.\"timeseriesresultvalues_valueid_seq\"'))," \ + ":result_id, " \ + ":data_value, " \ + ":value_datetime, " \ + ":utc_offset, " \ + ":censor_code, " \ + ":quality_code, " \ + ":time_aggregation_interval, " \ + ":time_aggregation_interval_unit);") + result = connection.execute(query, + result_id=result_value.result_id, + data_value=result_value.data_value, + value_datetime=result_value.value_datetime, + utc_offset=result_value.utc_offset, + censor_code=result_value.censor_code, + quality_code=result_value.quality_code, + time_aggregation_interval=result_value.time_aggregation_interval, + time_aggregation_interval_unit=result_value.time_aggregation_interval_unit, ) - return result \ No newline at end of file + return None + except sqlalchemy.exc.IntegrityError as e: + if hasattr(e, 'orig'): + if isinstance(e.orig, psycopg2.errors.UniqueViolation): + #data is already in database + return None + else: + return (f"Failed to INSERT data for uuid('{result_value.result_uuid}')") + else: + return (f"Failed to INSERT data for uuid('{result_value.result_uuid}')") + except Exception as e: + return (f"Failed to INSERT data for uuid('{result_value.result_uuid}')") \ No newline at end of file diff --git a/src/tsa/helpers.py b/src/tsa/helpers_deprecated.py similarity index 100% rename from src/tsa/helpers.py rename to src/tsa/helpers_deprecated.py