analysis/webservice/algorithms/doms/ResultsStorage.py (275 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import configparser
import json
import logging
from time import sleep
import math
import uuid
from datetime import datetime
import numpy as np
import pkg_resources
from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import Cluster
from cassandra.policies import TokenAwarePolicy, DCAwareRoundRobinPolicy
from cassandra.query import BatchStatement
from pytz import UTC
from webservice.algorithms.doms.BaseDomsHandler import DomsEncoder
from webservice.webmodel import NexusProcessingException
BATCH_SIZE = 1024
class ResultInsertException(IOError):
pass
class AbstractResultsContainer:
def __init__(self, config=None):
self._log = logging.getLogger(__name__)
self._log.info("Creating DOMS Results Storage Instance")
self._session = None
self._config = configparser.RawConfigParser()
self._config.read(AbstractResultsContainer._get_config_files('domsconfig.ini'))
if config:
self.override_config(config)
else:
print('Config NOT provided from params...')
def __enter__(self):
cassHost = self._config.get("cassandra", "host")
cassKeyspace = self._config.get("cassandra", "keyspace")
cassDatacenter = self._config.get("cassandra", "local_datacenter")
cassVersion = int(self._config.get("cassandra", "protocol_version"))
cassUsername = self._config.get("cassandra", "username")
cassPassword = self._config.get("cassandra", "password")
auth_provider = PlainTextAuthProvider(username=cassUsername, password=cassPassword)
dc_policy = DCAwareRoundRobinPolicy(cassDatacenter)
token_policy = TokenAwarePolicy(dc_policy)
self._cluster = Cluster([host for host in cassHost.split(',')], load_balancing_policy=token_policy,
protocol_version=cassVersion, auth_provider=auth_provider)
self._session = self._cluster.connect(cassKeyspace)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self._cluster.shutdown()
def _parseDatetime(self, dtString):
dt = datetime.strptime(dtString, "%Y-%m-%dT%H:%M:%SZ")
epoch = datetime.utcfromtimestamp(0)
time = (dt - epoch).total_seconds() * 1000.0
return int(time)
def override_config(self, config):
for section in config.sections():
if self._config.has_section(section):
for option in config.options(section):
if config.get(section, option) is not None:
self._config.set(section, option, config.get(section, option))
@staticmethod
def _get_config_files(filename):
log = logging.getLogger(__name__)
candidates = []
extensions = ['.default', '']
for extension in extensions:
try:
candidate = pkg_resources.resource_filename(__name__, filename + extension)
log.info('use config file {}'.format(filename + extension))
candidates.append(candidate)
except KeyError as ke:
log.warning('configuration file {} not found'.format(filename + extension))
return candidates
class ResultsStorage(AbstractResultsContainer):
def __init__(self, config=None):
AbstractResultsContainer.__init__(self, config)
def insertResults(self, results, params, stats, startTime, completeTime, userEmail, execution_id=None):
self._log.info('Beginning results write')
if isinstance(execution_id, str):
execution_id = uuid.UUID(execution_id)
execution_id = self.insertExecution(execution_id, startTime, completeTime, userEmail)
self.__insertParams(execution_id, params)
self.__insertStats(execution_id, stats)
self.__insertResults(execution_id, results)
self._log.info('Results write finished')
return execution_id
def insertExecution(self, execution_id, startTime, completeTime, userEmail):
if execution_id is None:
execution_id = uuid.uuid4()
cql = "INSERT INTO doms_executions (id, time_started, time_completed, user_email) VALUES (%s, %s, %s, %s)"
self._session.execute(cql, (execution_id, startTime, completeTime, userEmail))
return execution_id
def __insertParams(self, execution_id, params):
cql = """INSERT INTO doms_params
(execution_id, primary_dataset, matchup_datasets, depth_min, depth_max, time_tolerance, radius_tolerance, start_time, end_time, platforms, bounding_box, parameter)
VALUES
(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
self._session.execute(cql, (execution_id,
params["primary"],
",".join(params["matchup"]) if type(params["matchup"]) == list else params[
"matchup"],
params["depthMin"] if "depthMin" in list(params.keys()) else None,
params["depthMax"] if "depthMax" in list(params.keys()) else None,
int(params["timeTolerance"]),
params["radiusTolerance"],
params["startTime"],
params["endTime"],
params["platforms"],
params["bbox"],
params["parameter"]
))
def __insertStats(self, execution_id, stats):
cql = """
INSERT INTO doms_execution_stats
(execution_id, num_gridded_matched, num_gridded_checked, num_insitu_matched, num_insitu_checked, time_to_complete)
VALUES
(%s, %s, %s, %s, %s, %s)
"""
self._session.execute(cql, (
execution_id,
stats["numPrimaryMatched"],
None,
stats["numSecondaryMatched"],
None,
stats["timeToComplete"]
))
def __insertResults(self, execution_id, results):
cql = """
INSERT INTO doms_data
(id, execution_id, value_id, primary_value_id, x, y, source_dataset, measurement_time, platform, device, measurement_values_json, is_primary, depth, file_url)
VALUES
(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"""
insertStatement = self._session.prepare(cql)
inserts = []
for result in results:
inserts.extend(self.__prepare_result(execution_id, None, result, insertStatement))
for i in range(5):
if not self.__insert_result_batches(inserts, insertStatement):
if i < 4:
self._log.warning('Some write attempts failed; retrying')
sleep(10)
else:
self._log.error('Some write attempts failed; max retries exceeded')
raise ResultInsertException('Some result inserts failed')
else:
break
def __insert_result_batches(self, insert_params, insertStatement):
query_batches = [insert_params[i:i + BATCH_SIZE] for i in range(0, len(insert_params), BATCH_SIZE)]
move_successful = True
n_inserts = len(insert_params)
writing = 0
self._log.info(f'Inserting {n_inserts} matchup entries in JSON format')
for batch in query_batches:
futures = []
writing += len(batch)
self._log.info(
f'Writing batch of {len(batch)} matchup entries | ({writing}/{n_inserts}) [{writing / n_inserts * 100:7.3f}%]')
for entry in batch:
futures.append(self._session.execute_async(insertStatement, entry))
for future in futures:
try:
future.result()
except Exception:
move_successful = False
self._log.info('Result data write attempt completed')
return move_successful
def __prepare_result(self, execution_id, primaryId, result, insertStatement):
if 'primary' in result:
data = result['primary']
elif 'secondary' in result:
data = result['secondary']
else:
data = []
result_id = uuid.uuid4()
insert_params = (
result_id,
execution_id,
result["id"],
primaryId,
result["lon"],
result["lat"],
result["source"],
result["time"],
result["platform"] if "platform" in result else None,
result["device"] if "device" in result else None,
json.dumps(data, cls=DomsEncoder),
1 if primaryId is None else 0,
result["depth"],
result['fileurl']
)
params_list = [insert_params]
if "matches" in result:
for match in result["matches"]:
params_list.extend(self.__prepare_result(execution_id, result["id"], match, insertStatement))
return params_list
class ResultsRetrieval(AbstractResultsContainer):
def __init__(self, config=None):
AbstractResultsContainer.__init__(self, config)
def retrieveResults(self, execution_id, trim_data=False):
if isinstance(execution_id, str):
execution_id = uuid.UUID(execution_id)
params = self.__retrieveParams(execution_id)
stats = self.__retrieveStats(execution_id)
data = self.__retrieveData(execution_id, trim_data=trim_data)
return params, stats, data
def __retrieveData(self, id, trim_data=False):
dataMap = self.__retrievePrimaryData(id, trim_data=trim_data)
self.__enrichPrimaryDataWithMatches(id, dataMap, trim_data=trim_data)
data = [dataMap[name] for name in dataMap]
return data
def __enrichPrimaryDataWithMatches(self, id, dataMap, trim_data=False):
cql = "SELECT * FROM doms_data where execution_id = %s and is_primary = false"
rows = self._session.execute(cql, (id,))
for row in rows:
entry = self.__rowToDataEntry(row, trim_data=trim_data)
if row.primary_value_id in dataMap:
if not "matches" in dataMap[row.primary_value_id]:
dataMap[row.primary_value_id]["matches"] = []
dataMap[row.primary_value_id]["matches"].append(entry)
else:
print(row)
def __retrievePrimaryData(self, id, trim_data=False):
cql = "SELECT * FROM doms_data where execution_id = %s and is_primary = true"
rows = self._session.execute(cql, (id,))
dataMap = {}
for row in rows:
entry = self.__rowToDataEntry(row, trim_data=trim_data)
dataMap[row.value_id] = entry
return dataMap
def __rowToDataEntry(self, row, trim_data=False):
if trim_data:
entry = {
"lon": float(row.x),
"lat": float(row.y),
"source": row.source_dataset,
"time": row.measurement_time.replace(tzinfo=UTC)
}
else:
entry = {
"platform": row.platform,
"device": row.device,
"lon": str(row.x),
"lat": str(row.y),
"point": f"Point({float(row.x):.3f} {float(row.y):.3f})",
"time": row.measurement_time.replace(tzinfo=UTC),
"depth": float(row.depth) if row.depth is not None else None,
"fileurl": row.file_url if hasattr(row, 'file_url') else None,
"id": row.value_id,
"source": row.source_dataset,
}
# If doms_data uses the old schema, default to original behavior
try:
entry['primary' if row.is_primary else 'secondary'] = json.loads(row.measurement_values_json)
except AttributeError:
for key in row.measurement_values:
value = float(row.measurement_values[key])
entry[key] = value
return entry
def __retrieveStats(self, id):
cql = "SELECT num_gridded_matched, num_insitu_matched, time_to_complete FROM doms_execution_stats where execution_id = %s limit 1"
rows = self._session.execute(cql, (id,))
for row in rows:
stats = {
"timeToComplete": row.time_to_complete,
"numSecondaryMatched": row.num_insitu_matched,
"numPrimaryMatched": row.num_gridded_matched,
}
return stats
raise Exception("Execution not found with id '%s'" % id)
def __retrieveParams(self, id):
cql = "SELECT * FROM doms_params where execution_id = %s limit 1"
rows = self._session.execute(cql, (id,))
for row in rows:
matchup = row.matchup_datasets.split(",")
if len(matchup) == 1:
matchup = matchup[0]
params = {
"primary": row.primary_dataset,
"matchup": matchup,
"startTime": row.start_time.replace(tzinfo=UTC),
"endTime": row.end_time.replace(tzinfo=UTC),
"bbox": row.bounding_box,
"timeTolerance": int(row.time_tolerance) if row.time_tolerance is not None else None,
"radiusTolerance": float(row.radius_tolerance) if row.radius_tolerance is not None else None,
"platforms": row.platforms,
"parameter": row.parameter,
"depthMin": float(row.depth_min) if row.depth_min is not None else None,
"depthMax": float(row.depth_max) if row.depth_max is not None else None,
}
return params
raise Exception("Execution not found with id '%s'" % id)