analysis/webservice/algorithms/doms/ResultsStorage.py (301 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import configparser import json import logging import uuid from datetime import datetime from time import sleep import pkg_resources from cassandra.auth import PlainTextAuthProvider from cassandra.cluster import Cluster from cassandra.concurrent import execute_concurrent_with_args from cassandra.policies import TokenAwarePolicy, DCAwareRoundRobinPolicy from pytz import UTC from webservice.algorithms.doms.BaseDomsHandler import DomsEncoder from webservice.webmodel import NexusProcessingException BATCH_SIZE = 1024 logger = logging.getLogger(__name__) class ResultInsertException(IOError): pass class AbstractResultsContainer: def __init__(self, config=None): self._log = logging.getLogger(__name__) self._log.info("Creating DOMS Results Storage Instance") self._session = None self._config = configparser.RawConfigParser() self._config.read(AbstractResultsContainer._get_config_files('domsconfig.ini')) if config: self.override_config(config) else: print('Config NOT provided from params...') def __enter__(self): cassHost = self._config.get("cassandra", "host") cassKeyspace = self._config.get("cassandra", "keyspace") cassDatacenter = self._config.get("cassandra", "local_datacenter") cassVersion = int(self._config.get("cassandra", "protocol_version")) cassUsername = self._config.get("cassandra", "username") cassPassword = self._config.get("cassandra", "password") auth_provider = PlainTextAuthProvider(username=cassUsername, password=cassPassword) dc_policy = DCAwareRoundRobinPolicy(cassDatacenter) token_policy = TokenAwarePolicy(dc_policy) logger.info(f'Connecting to Cassandra cluster @ {[host for host in cassHost.split(",")]}; datacenter: ' f'{cassDatacenter}; protocol version: {cassVersion}') self._cluster = Cluster([host for host in cassHost.split(',')], load_balancing_policy=token_policy, protocol_version=cassVersion, auth_provider=auth_provider) self._session = self._cluster.connect(cassKeyspace) return self def __exit__(self, exc_type, exc_val, exc_tb): self._cluster.shutdown() def _parseDatetime(self, dtString): dt = datetime.strptime(dtString, "%Y-%m-%dT%H:%M:%SZ") epoch = datetime.utcfromtimestamp(0) time = (dt - epoch).total_seconds() * 1000.0 return int(time) def override_config(self, config): for section in config.sections(): if self._config.has_section(section): for option in config.options(section): if config.get(section, option) is not None: self._config.set(section, option, config.get(section, option)) @staticmethod def _get_config_files(filename): log = logging.getLogger(__name__) candidates = [] extensions = ['.default', ''] for extension in extensions: try: candidate = pkg_resources.resource_filename(__name__, filename + extension) log.info('use config file {}'.format(filename + extension)) candidates.append(candidate) except KeyError as ke: log.warning('configuration file {} not found'.format(filename + extension)) return candidates class ResultsStorage(AbstractResultsContainer): def __init__(self, config=None): AbstractResultsContainer.__init__(self, config) def insertInitialExecution(self, params, startTime, status, userEmail='', execution_id=None): """ Initial insert into database for CDMS matchup request. This populates the execution and params table. """ if isinstance(execution_id, str): execution_id = uuid.UUID(execution_id) execution_id = self.__insertExecution(execution_id, startTime, None, userEmail, status) self.__insertParams(execution_id, params) return execution_id def updateExecution(self, execution_id, completeTime, status, message, stats, results): if stats: self.__insertStats(execution_id, stats) if results: self.__insertResults(execution_id, results) self.__updateExecution(execution_id, completeTime, status, message) def __insertExecution(self, execution_id, startTime, completeTime, userEmail, status): """ Insert new entry into execution table """ if execution_id is None: execution_id = uuid.uuid4() cql = "INSERT INTO doms_executions (id, time_started, time_completed, user_email, status) VALUES (%s, %s, %s, %s, %s)" self._session.execute(cql, (execution_id, startTime, completeTime, userEmail, status)) return execution_id def __updateExecution(self, execution_id, complete_time, status, message=None): # Only update the status if it's "running". Any other state is immutable. cql = "UPDATE doms_executions SET time_completed = %s, status = %s, message = %s WHERE id=%s IF status = 'running'" self._session.execute(cql, (complete_time, status, message, execution_id)) return execution_id def __insertParams(self, execution_id, params): cql = """INSERT INTO doms_params (execution_id, primary_dataset, matchup_datasets, depth_min, depth_max, time_tolerance, radius_tolerance, start_time, end_time, platforms, bounding_box, parameter) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ self._session.execute(cql, (execution_id, params["primary"], ",".join(params["matchup"]) if type(params["matchup"]) == list else params[ "matchup"], params["depthMin"] if "depthMin" in list(params.keys()) else None, params["depthMax"] if "depthMax" in list(params.keys()) else None, int(params["timeTolerance"]), params["radiusTolerance"], params["startTime"], params["endTime"], params["platforms"], params["bbox"], params["parameter"] )) def __insertStats(self, execution_id, stats): cql = """ INSERT INTO doms_execution_stats (execution_id, num_gridded_matched, num_gridded_checked, num_insitu_matched, num_insitu_checked, time_to_complete, num_unique_secondaries) VALUES (%s, %s, %s, %s, %s, %s, %s) """ self._session.execute(cql, ( execution_id, stats['numPrimaryMatched'], None, stats['numSecondaryMatched'], None, stats['timeToComplete'], stats['numUniqueSecondaries'] )) def __insertResults(self, execution_id, results): cql = """ INSERT INTO doms_data (id, execution_id, value_id, primary_value_id, x, y, source_dataset, measurement_time, platform, device, measurement_values_json, is_primary, depth, file_url) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """ insertStatement = self._session.prepare(cql) inserts = [] for result in results: # 'PRIMARY' arg since primary values cannot have primary_value_id be null anymore # Secondary matches are prepped recursively from this call inserts.extend(self.__prepare_result(execution_id, 'PRIMARY', result, insertStatement)) for i in range(5): success, failed_entries = self.__insert_result_batches(inserts, insertStatement) if not success: if i < 4: self._log.warning('Some write attempts failed; retrying') inserts = failed_entries sleep(10) else: self._log.error('Some write attempts failed; max retries exceeded') raise ResultInsertException('Some result inserts failed') else: break def __insert_result_batches(self, insert_params, insertStatement): query_batches = [insert_params[i:i + BATCH_SIZE] for i in range(0, len(insert_params), BATCH_SIZE)] move_successful = True n_inserts = len(insert_params) writing = 0 failed = [] self._log.info(f'Inserting {n_inserts} matchup entries in JSON format') for batch in query_batches: futures = [] writing += len(batch) self._log.info( f'Writing batch of {len(batch)} matchup entries | ({writing}/{n_inserts}) [{writing / n_inserts * 100:7.3f}%]') for entry in batch: futures.append((entry, self._session.execute_async(insertStatement, entry))) for entry, future in futures: try: future.result() except Exception: move_successful = False failed.append(entry) self._log.info('Result data write attempt completed') return move_successful, failed def __prepare_result(self, execution_id, primaryId, result, insertStatement): if 'primary' in result: data = result['primary'] elif 'secondary' in result: data = result['secondary'] else: data = [] result_id = uuid.uuid4() insert_params = ( result_id, execution_id, result["id"], primaryId, result["lon"], result["lat"], result["source"], result["time"], result["platform"] if "platform" in result else None, result["device"] if "device" in result else None, json.dumps(data, cls=DomsEncoder), 1 if primaryId is 'PRIMARY' else 0, result["depth"], result['fileurl'] ) params_list = [insert_params] if "matches" in result: for match in result["matches"]: params_list.extend(self.__prepare_result(execution_id, result["id"], match, insertStatement)) return params_list class ResultsRetrieval(AbstractResultsContainer): def __init__(self, config=None): AbstractResultsContainer.__init__(self, config) def retrieveResults(self, execution_id, trim_data=False, page_num=1, page_size=1000): if isinstance(execution_id, str): execution_id = uuid.UUID(execution_id) params = self.retrieveParams(execution_id) stats = self.retrieveStats(execution_id) data = self.__retrieveData(execution_id, trim_data=trim_data, page_num=page_num, page_size=page_size) return params, stats, data def __retrieveData(self, id, trim_data=False, page_num=1, page_size=1000): dataMap = self.__retrievePrimaryData(id, trim_data=trim_data, page_num=page_num, page_size=page_size) self.__enrichPrimaryDataWithMatches(id, dataMap, trim_data=trim_data) data = [dataMap[name] for name in dataMap] return data def __enrichPrimaryDataWithMatches(self, id, dataMap, trim_data=False): cql = f"SELECT * FROM doms_data where execution_id = {str(id)} and is_primary = false and primary_value_id = ?" statement = self._session.prepare(cql) primary_ids = list(dataMap.keys()) logger.info(f'Getting secondary data for {len(primary_ids)} primaries of {str(id)}') for (success, rows) in execute_concurrent_with_args( self._session, statement, [(i,) for i in primary_ids], concurrency=50, results_generator=True ): for row in rows: entry = self.__rowToDataEntry(row, trim_data=trim_data) if row.primary_value_id in dataMap: if not "matches" in dataMap[row.primary_value_id]: dataMap[row.primary_value_id]["matches"] = [] dataMap[row.primary_value_id]["matches"].append(entry) def __retrievePrimaryData(self, id, trim_data=False, page_num=2, page_size=10): cql = "SELECT * FROM doms_data where execution_id = %s and is_primary = true limit %s" rows = self._session.execute(cql, [id, page_num * page_size]) dataMap = {} for row in rows[(page_num-1)*page_size:page_num*page_size]: entry = self.__rowToDataEntry(row, trim_data=trim_data) dataMap[row.value_id] = entry return dataMap def __rowToDataEntry(self, row, trim_data=False): if trim_data: entry = { "lon": float(row.x), "lat": float(row.y), "source": row.source_dataset, "time": row.measurement_time.replace(tzinfo=UTC) } else: entry = { "platform": row.platform, "device": row.device, "lon": str(row.x), "lat": str(row.y), "point": f"Point({float(row.x):.3f} {float(row.y):.3f})", "time": row.measurement_time.replace(tzinfo=UTC), "depth": float(row.depth) if row.depth is not None else None, "fileurl": row.file_url if hasattr(row, 'file_url') else None, "id": row.value_id, "source": row.source_dataset, } # If doms_data uses the old schema, default to original behavior try: entry['primary' if row.is_primary else 'secondary'] = json.loads(row.measurement_values_json) except AttributeError: for key in row.measurement_values: value = float(row.measurement_values[key]) entry[key] = value return entry def retrieveStats(self, id): cql = "SELECT num_gridded_matched, num_insitu_matched, time_to_complete, num_unique_secondaries FROM doms_execution_stats where execution_id = %s limit 1" rows = self._session.execute(cql, (id,)) for row in rows: stats = { 'timeToComplete': row.time_to_complete, 'numSecondaryMatched': row.num_insitu_matched, 'numPrimaryMatched': row.num_gridded_matched, 'numUniqueSecondaries': row.num_unique_secondaries } return stats def retrieveParams(self, id): cql = "SELECT * FROM doms_params where execution_id = %s limit 1" rows = self._session.execute(cql, (id,)) for row in rows: matchup = row.matchup_datasets.split(",") if len(matchup) == 1: matchup = matchup[0] params = { "primary": row.primary_dataset, "matchup": matchup, "startTime": row.start_time.replace(tzinfo=UTC), "endTime": row.end_time.replace(tzinfo=UTC), "bbox": row.bounding_box, "timeTolerance": int(row.time_tolerance) if row.time_tolerance is not None else None, "radiusTolerance": float(row.radius_tolerance) if row.radius_tolerance is not None else None, "platforms": row.platforms, "parameter": row.parameter, "depthMin": float(row.depth_min) if row.depth_min is not None else None, "depthMax": float(row.depth_max) if row.depth_max is not None else None, } return params raise NexusProcessingException(reason=f'No params found for id {str(id)}', code=404) def retrieveExecution(self, execution_id): """ Retrieve execution details from database. :param execution_id: Execution ID :return: execution status dictionary """ cql = "SELECT * FROM doms_executions where id = %s limit 1" rows = self._session.execute(cql, (execution_id,)) for row in rows: return { 'status': row.status, 'message': row.message, 'timeCompleted': row.time_completed, 'timeStarted': row.time_started } raise NexusProcessingException(reason=f'Execution not found with id {str(execution_id)}', code=404)