tools/doms-data-tools/update_doms_data_pk.py (161 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Script to transition doms.doms_data table to new schema with a primary key that will enable faster execution retrieval for large matchOnce=false matchups. Due to the nature of Cassandra, this will necessitate creating a temporary table, copying the data over, dropping the old table, recreating the table with the adjusted schema, copying the data back, and dropping the temporary table. This script does those steps, with the added option to stop after the initial copy for testing purposes. """ import argparse import logging import sys from cassandra.auth import PlainTextAuthProvider from cassandra.cluster import Cluster, NoHostAvailable, ExecutionProfile, EXEC_PROFILE_DEFAULT from cassandra.policies import RoundRobinPolicy, TokenAwarePolicy BATCH_SIZE = 10000 logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(levelname)s] [%(name)s::%(lineno)d] %(message)s', stream=sys.stdout ) log = logging.getLogger(__name__) def main(): parser = argparse.ArgumentParser() parser.add_argument( '-u', '--cassandra-username', help='The username used to connect to Cassandra.', dest='username', required=False, default='cassandra', metavar='USERNAME' ) parser.add_argument( '-p', '--cassandra-password', dest='password', help='The password used to connect to Cassandra.', required=False, default='cassandra', metavar='PASSWORD' ) parser.add_argument( '--cassandra', help='The hostname(s) or IP(s) of the Cassandra server(s).', required=False, default=['localhost'], dest='hosts', nargs='+', metavar=('localhost', '127.0.0.101') ) parser.add_argument( '--cassandraPort', help='The port used to connect to Cassandra.', dest='port', required=False, default=9042, type=int ) parser.add_argument( '--action', help='Copy or move', dest='action', required=False, default='move', choices=['move', 'copy'] ) args = parser.parse_args() log.info('Connecting to Cassandra cluster') dc_policy = RoundRobinPolicy() token_policy = TokenAwarePolicy(dc_policy) if args.username and args.password: auth_provider = PlainTextAuthProvider(username=args.username, password=args.password) else: auth_provider = None contact_points = [] for host_list in args.hosts: contact_points.extend(host_list.split(',')) try: with Cluster(contact_points, port=int(args.port), execution_profiles={ EXEC_PROFILE_DEFAULT: ExecutionProfile( load_balancing_policy=token_policy, request_timeout=60.0, ) }, protocol_version=4, auth_provider=auth_provider) as cluster: session = cluster.connect('doms') log.info('Connected successfully to Cassandra') cql = """ CREATE TABLE IF NOT EXISTS doms_data_temp ( id uuid, execution_id uuid, value_id text, primary_value_id text, is_primary boolean, x decimal, y decimal, source_dataset text, measurement_time timestamp, platform text, device text, measurement_values_json text, depth decimal, file_url text, PRIMARY KEY ((execution_id, is_primary), primary_value_id, id) ); """ log.info('Creating temp data table') session.execute(cql) def move_table(src_t, dst_t, can_delete=True): move_rows = [] cql = f""" SELECT * FROM {src_t}; """ data_rows = session.execute(cql) insert_cql = f""" INSERT INTO {dst_t} (id, execution_id, value_id, primary_value_id, x, y, source_dataset, measurement_time, platform, device, measurement_values_json, is_primary, depth, file_url) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """ insert_statement = session.prepare(insert_cql) n_moved = 0 def do_move(rows): remaining_rows = rows failed = [] futures = [] log.info(f'Inserting {len(rows):,} rows to {dst_t}') while len(remaining_rows) > 0: for entry in rows: futures.append((entry, session.execute_async(insert_statement, entry))) for entry, future in futures: try: future.result() except Exception: failed.append(entry) if len(failed) > 0: remaining_rows = failed failed = [] log.warning(f'Need to retry {len(remaining_rows):,} inserts') else: remaining_rows = [] return len(rows) for row in data_rows: pvid = row.primary_value_id if pvid is None: pvid = 'PRIMARY' move_rows.append( ( row.id, row.execution_id, row.value_id, pvid, row.x, row.y, row.source_dataset, row.measurement_time, row.platform, row.device, row.measurement_values_json, row.is_primary, row.depth, row.file_url ) ) if len(move_rows) >= BATCH_SIZE: n_moved += do_move(move_rows) log.info(f'Moved {n_moved:,} rows so far') move_rows = [] if len(move_rows) > 0: n_moved += do_move(move_rows) log.info(f'Moved {n_moved:,} rows so far') log.info('Copying data to temp table') move_table('doms_data', 'doms_data_temp') if args.action == 'move': cql = """ DROP TABLE doms_data; """ log.info('Dropping old table') session.execute(cql) cql = """ CREATE TABLE doms_data ( id uuid, execution_id uuid, value_id text, primary_value_id text, is_primary boolean, x decimal, y decimal, source_dataset text, measurement_time timestamp, platform text, device text, measurement_values_json text, depth decimal, file_url text, PRIMARY KEY ((execution_id, is_primary), primary_value_id, id) ); """ log.info('Creating data table with corrected schema') session.execute(cql) log.info('Copying data back') move_table('doms_data_temp', 'doms_data', False) cql = """ DROP TABLE doms_data_temp; """ log.info('Dropping temp table') session.execute(cql) log.info('Disconnecting from Cassandra') session.shutdown() log.info('Done') except NoHostAvailable as ne: log.exception(ne) exit(1) if __name__ == '__main__': main()