in tools/doms-data-tools/update_doms_data_pk.py [0:0]
def main():
parser = argparse.ArgumentParser()
parser.add_argument(
'-u', '--cassandra-username',
help='The username used to connect to Cassandra.',
dest='username',
required=False,
default='cassandra',
metavar='USERNAME'
)
parser.add_argument(
'-p', '--cassandra-password',
dest='password',
help='The password used to connect to Cassandra.',
required=False,
default='cassandra',
metavar='PASSWORD'
)
parser.add_argument(
'--cassandra',
help='The hostname(s) or IP(s) of the Cassandra server(s).',
required=False,
default=['localhost'],
dest='hosts',
nargs='+',
metavar=('localhost', '127.0.0.101')
)
parser.add_argument(
'--cassandraPort',
help='The port used to connect to Cassandra.',
dest='port',
required=False,
default=9042,
type=int
)
parser.add_argument(
'--action',
help='Copy or move',
dest='action',
required=False,
default='move',
choices=['move', 'copy']
)
args = parser.parse_args()
log.info('Connecting to Cassandra cluster')
dc_policy = RoundRobinPolicy()
token_policy = TokenAwarePolicy(dc_policy)
if args.username and args.password:
auth_provider = PlainTextAuthProvider(username=args.username, password=args.password)
else:
auth_provider = None
contact_points = []
for host_list in args.hosts:
contact_points.extend(host_list.split(','))
try:
with Cluster(contact_points,
port=int(args.port),
execution_profiles={
EXEC_PROFILE_DEFAULT: ExecutionProfile(
load_balancing_policy=token_policy,
request_timeout=60.0,
)
},
protocol_version=4,
auth_provider=auth_provider) as cluster:
session = cluster.connect('doms')
log.info('Connected successfully to Cassandra')
cql = """
CREATE TABLE IF NOT EXISTS doms_data_temp (
id uuid,
execution_id uuid,
value_id text,
primary_value_id text,
is_primary boolean,
x decimal,
y decimal,
source_dataset text,
measurement_time timestamp,
platform text,
device text,
measurement_values_json text,
depth decimal,
file_url text,
PRIMARY KEY ((execution_id, is_primary), primary_value_id, id)
);
"""
log.info('Creating temp data table')
session.execute(cql)
def move_table(src_t, dst_t, can_delete=True):
move_rows = []
cql = f"""
SELECT * FROM {src_t};
"""
data_rows = session.execute(cql)
insert_cql = f"""
INSERT INTO {dst_t}
(id, execution_id, value_id, primary_value_id, x, y, source_dataset, measurement_time, platform, device, measurement_values_json, is_primary, depth, file_url)
VALUES
(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"""
insert_statement = session.prepare(insert_cql)
n_moved = 0
def do_move(rows):
remaining_rows = rows
failed = []
futures = []
log.info(f'Inserting {len(rows):,} rows to {dst_t}')
while len(remaining_rows) > 0:
for entry in rows:
futures.append((entry, session.execute_async(insert_statement, entry)))
for entry, future in futures:
try:
future.result()
except Exception:
failed.append(entry)
if len(failed) > 0:
remaining_rows = failed
failed = []
log.warning(f'Need to retry {len(remaining_rows):,} inserts')
else:
remaining_rows = []
return len(rows)
for row in data_rows:
pvid = row.primary_value_id
if pvid is None:
pvid = 'PRIMARY'
move_rows.append(
(
row.id,
row.execution_id,
row.value_id,
pvid,
row.x,
row.y,
row.source_dataset,
row.measurement_time,
row.platform,
row.device,
row.measurement_values_json,
row.is_primary,
row.depth,
row.file_url
)
)
if len(move_rows) >= BATCH_SIZE:
n_moved += do_move(move_rows)
log.info(f'Moved {n_moved:,} rows so far')
move_rows = []
if len(move_rows) > 0:
n_moved += do_move(move_rows)
log.info(f'Moved {n_moved:,} rows so far')
log.info('Copying data to temp table')
move_table('doms_data', 'doms_data_temp')
if args.action == 'move':
cql = """
DROP TABLE doms_data;
"""
log.info('Dropping old table')
session.execute(cql)
cql = """
CREATE TABLE doms_data (
id uuid,
execution_id uuid,
value_id text,
primary_value_id text,
is_primary boolean,
x decimal,
y decimal,
source_dataset text,
measurement_time timestamp,
platform text,
device text,
measurement_values_json text,
depth decimal,
file_url text,
PRIMARY KEY ((execution_id, is_primary), primary_value_id, id)
);
"""
log.info('Creating data table with corrected schema')
session.execute(cql)
log.info('Copying data back')
move_table('doms_data_temp', 'doms_data', False)
cql = """
DROP TABLE doms_data_temp;
"""
log.info('Dropping temp table')
session.execute(cql)
log.info('Disconnecting from Cassandra')
session.shutdown()
log.info('Done')
except NoHostAvailable as ne:
log.exception(ne)
exit(1)