tools/update-doms-data-schema/update.py (159 lines of code) (raw):

import argparse import configparser import decimal import json import logging from cassandra.auth import PlainTextAuthProvider from cassandra.cluster import Cluster, NoHostAvailable, ExecutionProfile, EXEC_PROFILE_DEFAULT from cassandra.policies import (DCAwareRoundRobinPolicy, TokenAwarePolicy, WhiteListRoundRobinPolicy) try: logging.getLogger('webservice.NexusHandler').setLevel(logging.CRITICAL) except: pass from webservice.algorithms.doms.DomsInitialization import DomsInitializer BATCH_SIZE = 1024 log = logging.getLogger(__name__) class Encoder(json.JSONEncoder): def __init__(self, **args): json.JSONEncoder.__init__(self, **args) def default(self, obj): if isinstance(obj, decimal.Decimal): return float(obj) else: return json.JSONEncoder.default(self, obj) def main(): domsconfig = configparser.ConfigParser() domsconfig.read(DomsInitializer._get_config_files('domsconfig.ini')) parser = argparse.ArgumentParser() parser.add_argument('-u', '--cassandra-username', dest='username', help='The username used to connect to Cassandra.', required=True, metavar='USERNAME') parser.add_argument('-p', '--cassandra-password', dest='password', help='The password used to connect to Cassandra.', required=True, metavar='PASSWORD') parser.add_argument('--cassandra', help='The hostname(s) or IP(s) of the Cassandra server(s).', required=False, default=domsconfig.get("cassandra", "host"), dest='hosts', nargs='+', metavar=('localhost', '127.0.0.101')) parser.add_argument('--cassandraPort', help='The port used to connect to Cassandra.', dest='port', required=False, default=domsconfig.get("cassandra", "port")) args = parser.parse_args() cassHost = args.hosts cassPort = args.port cassUsername = args.username cassPassword = args.password cassKeyspace = domsconfig.get("cassandra", "keyspace") cassDatacenter = domsconfig.get("cassandra", "local_datacenter") cassVersion = int(domsconfig.get("cassandra", "protocol_version")) cassPolicy = domsconfig.get("cassandra", "dc_policy") log.info("Cassandra Host(s): %s" % (cassHost)) log.info("Cassandra Keyspace: %s" % (cassKeyspace)) log.info("Cassandra Datacenter: %s" % (cassDatacenter)) log.info("Cassandra Protocol Version: %s" % (cassVersion)) log.info("Cassandra DC Policy: %s" % (cassPolicy)) log.info("Cassandra Auth: %s : %s" % (cassUsername, cassPassword)) if cassPolicy == 'DCAwareRoundRobinPolicy': dc_policy = DCAwareRoundRobinPolicy(cassDatacenter) token_policy = TokenAwarePolicy(dc_policy) elif cassPolicy == 'WhiteListRoundRobinPolicy': token_policy = WhiteListRoundRobinPolicy([cassHost]) else: raise ValueError(cassPolicy) if cassUsername and cassPassword: auth_provider = PlainTextAuthProvider(username=cassUsername, password=cassPassword) else: auth_provider = None try: with Cluster([host for host in cassHost.split(',')], port=int(cassPort), execution_profiles={ EXEC_PROFILE_DEFAULT: ExecutionProfile(load_balancing_policy=token_policy) }, protocol_version=cassVersion, auth_provider=auth_provider) as cluster: session = cluster.connect(cassKeyspace) cql = """ alter table doms_data add measurement_values_json text; """ log.info('Creating measurement_values_json column') try: session.execute(cql) except: log.warning('measurement_values_json column creation failed; perhaps it already exists') cql = """ alter table doms_data add file_url text; """ log.info('Creating file_url column') try: session.execute(cql) except: log.warning('file_url column creation failed; perhaps it already exists') for i in range(5): if not move_data(session): if i < 4: log.warning('Some move attempts failed; retrying') else: log.critical('Some move attempts failed; max retries exceeded') exit(1) else: break cql = """ alter table doms_data drop measurement_values; """ log.info('Dropping old measurement_values column') session.execute(cql) except NoHostAvailable as e: log.error("Unable to connect to Cassandra, Nexus will not be able to access local data ", e) except Exception as e: log.critical('An uncaught exception occurred') log.exception(e) exit(2) def move_data(session): cql = """ SELECT execution_id, is_primary, id, measurement_values FROM doms_data; """ log.info('Fetching execution measurements') try: rows = session.execute(cql) except: log.warning('SELECT query failed; the measurement_values column may no longer exist') exit(0) update_params = [] for row in rows: if row.measurement_values is not None: update_params.append(( json.dumps(translate_values(dict(row.measurement_values)), indent=4, cls=Encoder), # values row.execution_id, # execution_id row.is_primary, # is_primary row.id, # id )) update_cql = """ UPDATE doms_data SET measurement_values=null, measurement_values_json=? WHERE execution_id=? AND is_primary=? AND id=?; """ update_query = session.prepare(update_cql) query_batches = [update_params[i:i + BATCH_SIZE] for i in range(0, len(update_params), BATCH_SIZE)] move_successful = True n_entries = len(update_params) writing = 0 log.info(f'Writing {n_entries} entries in JSON format') for batch in query_batches: futures = [] writing += len(batch) log.info(f'Writing batch of {len(batch)} entries | ({writing}/{n_entries}) [{writing/n_entries*100:7.3f}%]') for entry in batch: futures.append(session.execute_async(update_query, entry)) for future in futures: try: future.result() except Exception: move_successful = False log.info('Move attempt completed') return move_successful def translate_values(values_dict): values = [] for key in values_dict: values.append({ "variable_name": "", "cf_variable_name": key, "variable_value": values_dict[key], "variable_unit": None }) return values if __name__ == '__main__': main()