def main()

in tools/doms-data-tools/update_doms_data_pk.py [0:0]


def main():
    parser = argparse.ArgumentParser()

    parser.add_argument(
        '-u', '--cassandra-username',
        help='The username used to connect to Cassandra.',
        dest='username',
        required=False,
        default='cassandra',
        metavar='USERNAME'
    )

    parser.add_argument(
        '-p', '--cassandra-password',
        dest='password',
        help='The password used to connect to Cassandra.',
        required=False,
        default='cassandra',
        metavar='PASSWORD'
    )

    parser.add_argument(
        '--cassandra',
        help='The hostname(s) or IP(s) of the Cassandra server(s).',
        required=False,
        default=['localhost'],
        dest='hosts',
        nargs='+',
        metavar=('localhost', '127.0.0.101')
    )

    parser.add_argument(
        '--cassandraPort',
        help='The port used to connect to Cassandra.',
        dest='port',
        required=False,
        default=9042,
        type=int
    )

    parser.add_argument(
        '--action',
        help='Copy or move',
        dest='action',
        required=False,
        default='move',
        choices=['move', 'copy']
    )

    args = parser.parse_args()

    log.info('Connecting to Cassandra cluster')

    dc_policy = RoundRobinPolicy()
    token_policy = TokenAwarePolicy(dc_policy)

    if args.username and args.password:
        auth_provider = PlainTextAuthProvider(username=args.username, password=args.password)
    else:
        auth_provider = None

    contact_points = []

    for host_list in args.hosts:
        contact_points.extend(host_list.split(','))

    try:
        with Cluster(contact_points,
                     port=int(args.port),
                     execution_profiles={
                         EXEC_PROFILE_DEFAULT: ExecutionProfile(
                             load_balancing_policy=token_policy,
                             request_timeout=60.0,
                         )
                     },
                     protocol_version=4,
                     auth_provider=auth_provider) as cluster:

            session = cluster.connect('doms')

            log.info('Connected successfully to Cassandra')

            cql = """
            CREATE TABLE IF NOT EXISTS doms_data_temp (
              id uuid,
              execution_id uuid,
              value_id text,
              primary_value_id text,
              is_primary boolean,
              x decimal,
              y decimal,
              source_dataset text,
              measurement_time timestamp,
              platform text,
              device text,
              measurement_values_json text,
              depth decimal,
              file_url text,
              PRIMARY KEY ((execution_id, is_primary), primary_value_id, id)
            );
            """

            log.info('Creating temp data table')

            session.execute(cql)

            def move_table(src_t, dst_t, can_delete=True):
                move_rows = []

                cql = f"""
                SELECT * FROM {src_t};
                """

                data_rows = session.execute(cql)

                insert_cql = f"""
                INSERT INTO {dst_t}
                    (id, execution_id, value_id, primary_value_id, x, y, source_dataset, measurement_time, platform, device, measurement_values_json, is_primary, depth, file_url)
               VALUES
                    (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
                """

                insert_statement = session.prepare(insert_cql)

                n_moved = 0

                def do_move(rows):
                    remaining_rows = rows
                    failed = []
                    futures = []

                    log.info(f'Inserting {len(rows):,} rows to {dst_t}')

                    while len(remaining_rows) > 0:
                        for entry in rows:
                            futures.append((entry, session.execute_async(insert_statement, entry)))

                        for entry, future in futures:
                            try:
                                future.result()
                            except Exception:
                                failed.append(entry)

                        if len(failed) > 0:
                            remaining_rows = failed
                            failed = []

                            log.warning(f'Need to retry {len(remaining_rows):,} inserts')
                        else:
                            remaining_rows = []

                        return len(rows)

                for row in data_rows:
                    pvid = row.primary_value_id

                    if pvid is None:
                        pvid = 'PRIMARY'

                    move_rows.append(
                        (
                            row.id,
                            row.execution_id,
                            row.value_id,
                            pvid,
                            row.x,
                            row.y,
                            row.source_dataset,
                            row.measurement_time,
                            row.platform,
                            row.device,
                            row.measurement_values_json,
                            row.is_primary,
                            row.depth,
                            row.file_url
                        )
                    )

                    if len(move_rows) >= BATCH_SIZE:
                        n_moved += do_move(move_rows)
                        log.info(f'Moved {n_moved:,} rows so far')
                        move_rows = []

                if len(move_rows) > 0:
                    n_moved += do_move(move_rows)
                    log.info(f'Moved {n_moved:,} rows so far')

            log.info('Copying data to temp table')

            move_table('doms_data', 'doms_data_temp')

            if args.action == 'move':
                cql = """
                DROP TABLE doms_data;
                """

                log.info('Dropping old table')

                session.execute(cql)

                cql = """
                            CREATE TABLE doms_data (
                              id uuid,
                              execution_id uuid,
                              value_id text,
                              primary_value_id text,
                              is_primary boolean,
                              x decimal,
                              y decimal,
                              source_dataset text,
                              measurement_time timestamp,
                              platform text,
                              device text,
                              measurement_values_json text,
                              depth decimal,
                              file_url text,
                              PRIMARY KEY ((execution_id, is_primary), primary_value_id, id)
                            );
                            """

                log.info('Creating data table with corrected schema')

                session.execute(cql)

                log.info('Copying data back')

                move_table('doms_data_temp', 'doms_data', False)

                cql = """
                        DROP TABLE doms_data_temp;
                        """

                log.info('Dropping temp table')

                session.execute(cql)

            log.info('Disconnecting from Cassandra')
            session.shutdown()

        log.info('Done')
    except NoHostAvailable as ne:
        log.exception(ne)
        exit(1)