tools/domspurge/purge.py (257 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import argparse import json import logging import sys from datetime import datetime from typing import Tuple, List from cassandra.auth import PlainTextAuthProvider from cassandra.cluster import Cluster, NoHostAvailable, ExecutionProfile, EXEC_PROFILE_DEFAULT from cassandra.policies import RoundRobinPolicy, TokenAwarePolicy from dateutil import parser as du_parser from dateutil.relativedelta import relativedelta from six.moves import input from tqdm import tqdm try: logging.getLogger('webservice.NexusHandler').setLevel(logging.CRITICAL) from webservice.algorithms.doms.DomsInitialization import DomsInitializer except ImportError: from DomsInitialization import DomsInitializer logging.basicConfig(level=logging.INFO, stream=sys.stdout) log = logging.getLogger(__name__) dry_run = False non_interactive = False def get_confirmation(prompt_string='Continue? [y]/n: '): if non_interactive: return True do_continue = input(prompt_string) while do_continue not in ['y', 'n', '']: do_continue = input(prompt_string) return do_continue != 'n' def main(args, before, keep_completed, keep_failed, purge_all, recreate): log.info('Connecting to Cassandra cluster') dc_policy = RoundRobinPolicy() token_policy = TokenAwarePolicy(dc_policy) if args.username and args.password: auth_provider = PlainTextAuthProvider(username=args.username, password=args.password) else: auth_provider = None contact_points = [] for host_list in args.hosts: contact_points.extend(host_list.split(',')) try: with Cluster(contact_points, port=int(args.port), execution_profiles={ EXEC_PROFILE_DEFAULT: ExecutionProfile( load_balancing_policy=token_policy, request_timeout=60.0, ) }, protocol_version=int(args.pv), auth_provider=auth_provider) as cluster: session = cluster.connect(args.keyspace) if not recreate else cluster.connect() log.info('Connected successfully to Cassandra') if recreate: log.info('Recreating doms keyspace') create_keyspace(session, args.keyspace) exit(0) log.info('Determining the number of executions that will be dropped') execution_count, ids = count_executions(session, before, keep_completed, keep_failed, purge_all) if dry_run: if execution_count == 0: log.info('No executions will be deleted with the provided criteria') elif purge_all: log.info(f'The \'{args.keyspace}\' keyspace will be dropped then recreated w/ all needed tables') else: log.info(f'The following executions would be deleted: \n' f'{json.dumps([str(rid) for rid in ids], indent=4)} \n' f'Total: {len(ids):,}') exit(0) if execution_count == 0 and not purge_all: log.info('No executions will be deleted with the provided criteria') exit(0) elif execution_count == 0 and purge_all: if not get_confirmation('No executions will be deleted with the provided criteria. Do you still wish ' f'to drop & recreate the \'{args.keyspace}\' keyspace? [y]/n: '): exit(0) else: if not get_confirmation(f'{execution_count:,} executions selected for deletion. Continue? [y]/n: '): exit(0) if purge_all: purge_all_data(session, args.keyspace) else: for row_id in tqdm(ids, ascii=True, desc='Executions deleted', ncols=80, unit='Execution'): delete_execution(session, row_id) log.info(f'Successfully deleted the following executions: \n' f'{json.dumps([str(rid) for rid in ids], indent=4)} \n' f'Total: {len(ids):,}') except NoHostAvailable as ne: log.exception(ne) exit(1) def delete_execution(session, row_id): cql_data_primary = session.prepare(""" DELETE FROM doms_data WHERE execution_id=? AND is_primary = true; """) cql_data_secondary = session.prepare(""" DELETE FROM doms_data WHERE execution_id=? AND is_primary = false; """) cql_execution_stats = session.prepare(""" DELETE FROM doms_execution_stats WHERE execution_id=?; """) cql_params = session.prepare(""" DELETE FROM doms_params WHERE execution_id=?; """) cql_executions = session.prepare(""" DELETE FROM doms_executions WHERE id=?; """) session.execute(cql_data_primary, (row_id,)) session.execute(cql_data_secondary, (row_id,)) session.execute(cql_execution_stats, (row_id,)) session.execute(cql_params, (row_id,)) session.execute(cql_executions, (row_id,)) def create_keyspace(session, keyspace): log.info('*** RUNNING DOMS INITIALIZATION ***') initializer = DomsInitializer() initializer.createKeyspace(session, keyspace) initializer.createTables(session) def purge_all_data(session, keyspace): if not get_confirmation(f'You have selected to purge all data. This will drop and recreate the \'{keyspace}\' ' 'keyspace. Continue? [y]/n: '): exit(0) cql = f""" drop keyspace {keyspace}; """ log.info('Executing keyspace drop') log.info('NOTE: If something goes wrong with keyspace recreation, rerun this utility with just \'--recreate-ks\'' ' and the cassandra auth options') session.execute(cql, timeout=None) log.info(f'\'{keyspace}\' keyspace dropped. Recreating it now.') create_keyspace(session, keyspace) def count_executions(session, before, keep_completed, keep_failed, purge_all) -> Tuple[int, List]: if purge_all: cql = """ SELECT COUNT (id) FROM doms_executions; """ return session.execute(cql).one().system_count_id, [] elif before and not keep_failed: # Drop nulls & all before # Cassandra doesn't allow for selecting null values, so we have to check them all manually log.info(f'Counting executions before {before} including uncompleted executions') cql = """ SELECT * FROM doms_executions; """ to_delete = [] for row in session.execute(cql): if (row.time_completed is None and row.time_started <= before) or \ (row.time_completed is not None and row.time_completed <= before): to_delete.append(row.id) return len(to_delete), to_delete elif before and keep_failed: # Drop all before but not nulls log.info(f'Counting executions before {before} excluding uncompleted executions') cql = """ SELECT id FROM doms_executions WHERE time_completed<=? ALLOW FILTERING ; """ query = session.prepare(cql) to_delete = [] for row in session.execute(query, (before,)): to_delete.append(row.id) return len(to_delete), to_delete elif keep_completed: # Only drop nulls # Cassandra doesn't allow for selecting null values, so we have to check them all manually log.info(f'Counting ALL uncompleted executions') cql = """ SELECT * FROM doms_executions; """ to_delete = [] for row in session.execute(cql): if row.time_completed is None: to_delete.append(row.id) return len(to_delete), to_delete def parse_args(): parser = argparse.ArgumentParser(description='Purge DOMS data from Cassandra') cassandra_args = parser.add_argument_group('Cassandra args') purge_options = parser.add_argument_group('Purge options') cassandra_args.add_argument('--cassandra', help='The hostname(s) or IP(s) of the Cassandra server(s).', required=False, default=['localhost'], dest='hosts', nargs='+', metavar=('localhost', '127.0.0.101')) cassandra_args.add_argument('-k', '--cassandraKeyspace', help='The Cassandra keyspace for DOMS data.', default='doms', dest='keyspace', required=False, metavar='DOMS_KEYSPACE') cassandra_args.add_argument('--cassandraPort', help='The port used to connect to Cassandra.', dest='port', required=False, default='9042') cassandra_args.add_argument('-u', '--cassandra-username', dest='username', help='The username used to connect to Cassandra.', required=True, metavar='USERNAME') cassandra_args.add_argument('-p', '--cassandra-password', dest='password', help='The password used to connect to Cassandra.', required=True, metavar='PASSWORD') cassandra_args.add_argument('--cassandraProtocolVersion', help='The version of the Cassandra protocol the driver should use.', required=False, dest='pv', choices=['1', '2', '3', '4', '5'], default='4') time_before = purge_options.add_mutually_exclusive_group(required=True) time_before.add_argument('--before', help='Date & time before which data will be purged. Time entered should be UTC. Do not ' 'specify timezone.', type=du_parser.parse, dest='before_dt', metavar='DATETIME', default=None) def num_months(s): v = int(s) if v <= 0: raise ValueError('--before-months must be >= 1') return v time_before.add_argument('--before-months', help='Drop all data before n months ago', type=num_months, dest='before_mo', metavar='MONTHS', default=None) time_before.add_argument('--keep-completed', help='Keep all completed executions (only purge failed executions)', action='store_true', dest='keep') time_before.add_argument('--all', help='Purge ALL data (drops and re-creates keyspace)', action='store_true', dest='all') time_before.add_argument('--recreate-ks', help=argparse.SUPPRESS, action='store_true', dest='recreate') purge_options.add_argument('--keep-failed', help='Keep failed executions.', action='store_true', dest='keep_failed') parser.add_argument('--dry-run', help='Only print the execution ids to be deleted / DB operations to be performed. Do not ' 'actually alter the DB', action='store_true', dest='dry_run') parser.add_argument('-y', '--yes', help='Do not ask for confirmation.', action='store_true', dest='yes') args = parser.parse_args() global dry_run global non_interactive dry_run = args.dry_run non_interactive = args.yes if args.recreate: return args, None, False, False, False, True if args.all: if args.keep_failed: raise ValueError('Mutually exclusive options (purge all & keep) selected') return args, None, False, False, True, False if args.keep and args.keep_failed: raise ValueError('--keep-completed and --keep-failed are set; this will have no effect') if args.keep: before = None elif args.before_dt: before = args.before_dt else: now = datetime.utcnow() delta = relativedelta(months=-args.before_mo) before = now + delta return args, before, args.keep, args.keep_failed, False, False if __name__ == '__main__': try: main(*parse_args()) except Exception as e: log.error('An unexpected error occurred...') log.exception(e) exit(-1)