tools/deletebyquery/deletebyquery.py (199 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import argparse import json import logging import uuid from random import sample from cassandra.auth import PlainTextAuthProvider import cassandra.concurrent from cassandra.cluster import Cluster from cassandra.policies import RoundRobinPolicy, TokenAwarePolicy from solrcloudpy import SolrConnection, SearchOptions from six.moves import input solr_connection = None solr_collection = None SOLR_UNIQUE_KEY = None cassandra_cluster = None cassandra_session = None cassandra_table = None logging.basicConfig() logging.getLogger().setLevel(logging.INFO) logging.getLogger().handlers[0].setFormatter( logging.Formatter(fmt="%(asctime)s %(levelname)s:%(name)s: %(message)s", datefmt="%Y-%m-%dT%H:%M:%S")) logging.getLogger('cassandra').setLevel(logging.CRITICAL) def init(args): global solr_connection solr_connection = SolrConnection(args.solr) global solr_collection solr_collection = solr_connection[args.collection] global SOLR_UNIQUE_KEY SOLR_UNIQUE_KEY = args.solrIdField dc_policy = RoundRobinPolicy() token_policy = TokenAwarePolicy(dc_policy) if args.cassandraUsername and args.cassandraPassword: auth_provider = PlainTextAuthProvider(username=args.cassandraUsername, password=args.cassandraPassword) else: auth_provider = None global cassandra_cluster cassandra_cluster = Cluster(contact_points=args.cassandra, port=args.cassandraPort, protocol_version=int(args.cassandraProtocolVersion), load_balancing_policy=token_policy, auth_provider=auth_provider) global cassandra_session cassandra_session = cassandra_cluster.connect(keyspace=args.cassandraKeyspace) global cassandra_table cassandra_table = args.cassandraTable def delete_by_query(args): if args.query: se = SearchOptions() se.commonparams.q(args.query) \ .fl(SOLR_UNIQUE_KEY) \ .fl('id') for fq in args.filterquery if args.filterquery is not None else []: se.commonparams.fq(fq) query = se elif args.jsonparams: se = SearchOptions(**json.loads(args.jsonparams)) se.commonparams.fl(SOLR_UNIQUE_KEY) \ .fl('id') query = se else: raise RuntimeError("either query or jsonparams is required") query.commonparams.rows(args.rows) if check_query(query): logging.info("Collecting tiles ....") solr_docs = do_solr_query(query) if confirm_delete(len(solr_docs)): deleted_ids = do_delete(solr_docs, query) logging.info("Deleted tile IDs %s" % json.dumps([str(doc_id) for doc_id in deleted_ids], indent=2)) else: logging.info("Exiting") return else: logging.info("Exiting") return def confirm_delete(num_found): do_continue = input( "This action will delete %s record(s) from SOLR and Cassandra. Are you sure you want to Continue? y/n: " % num_found) while do_continue not in ['y', 'n']: do_continue = input( "This action will delete %s record(s) from SOLR and Cassandra. Are you sure you want to Continue? y/n: " % num_found) return do_continue == 'y' def check_query(query): solr_response = solr_collection.search(query) num_found = solr_response.result.response.numFound if num_found == 0: logging.info("Query returned 0 results") return False do_continue = input("Query found %s matching documents. Continue? [y]/n/(s)ample: " % num_found) while do_continue not in ['y', 'n', 's', '']: do_continue = input("Query found %s matching documents. Continue? [y]/n/(s)ample: " % num_found) if do_continue == 'y' or do_continue == '': return True elif do_continue == 'n': return False else: se = SearchOptions() se.commonparams.q('%s:%s' % (SOLR_UNIQUE_KEY, sample(solr_response.result.response.docs, 1)[0][SOLR_UNIQUE_KEY])) logging.info(json.dumps(solr_collection.search(se).result.response.docs[0], indent=2)) return check_query(query) def do_solr_query(query): doc_ids = [] next_cursor_mark = "*" query.commonparams.sort('%s asc' % SOLR_UNIQUE_KEY) while True: query.commonparams.remove_param('cursorMark') query.commonparams.add_params(cursorMark=next_cursor_mark) solr_response = solr_collection.search(query) try: result_next_cursor_mark = solr_response.result.nextCursorMark except AttributeError: # No Results return [] if result_next_cursor_mark == next_cursor_mark: break else: next_cursor_mark = solr_response.result.nextCursorMark doc_ids.extend([uuid.UUID(doc['id']) for doc in solr_response.result.response.docs]) return doc_ids def do_delete(doc_ids, query): logging.info("Executing Cassandra delete...") delete_from_cassandra(doc_ids) logging.info("Executing Solr delete...") delete_from_solr(query) return doc_ids def delete_from_cassandra(doc_ids): statement = cassandra_session.prepare("DELETE FROM %s WHERE tile_id=?" % cassandra_table) results = cassandra.concurrent.execute_concurrent_with_args(cassandra_session, statement, [(doc_id,) for doc_id in doc_ids]) for (success, result) in results: if not success: logging.warning("Could not delete tile %s" % result) def delete_from_solr(query): solr_collection.delete(query, commit=False) solr_collection.commit() def parse_args(): parser = argparse.ArgumentParser(description='Delete data from NEXUS using a Solr Query', formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument('--solr', help='The url of the SOLR server.', required=True, metavar='127.0.0.1:8983') parser.add_argument('--collection', help='The name of the SOLR collection.', required=False, default='nexustiles', metavar='nexustiles') parser.add_argument('--solrIdField', help='The name of the unique ID field for this collection.', required=False, default='id', metavar='id') parser.add_argument('--cassandra', help='The hostname(s) or IP(s) of the Cassandra server(s).', required=True, nargs='+', metavar=('127.0.0.100', '127.0.0.101')) parser.add_argument('-k', '--cassandraKeyspace', help='The Cassandra keyspace.', default='nexustiles', required=False, metavar='nexustiles') group = parser.add_mutually_exclusive_group(required=True) group.add_argument('-q', '--query', help='The ''q'' parameter passed to SOLR Search', metavar='*:*') group.add_argument('--jsonparams', help='Full query prameters formatted as JSON') parser.add_argument('-fq', '--filterquery', help='The ''fq'' parameter passed to SOLR Search. Only used if --jsonparams is not provided', required=False, nargs='+') parser.add_argument('-t', '--cassandraTable', help='The name of the cassandra table.', required=False, default='sea_surface_temp') parser.add_argument('-p', '--cassandraPort', help='The port used to connect to Cassandra.', required=False, default='9042') parser.add_argument('--cassandraUsername', help='The username used to connect to Cassandra.', required=False) parser.add_argument('--cassandraPassword', help='The password used to connect to Cassandra.', required=False) parser.add_argument('-pv', '--cassandraProtocolVersion', help='The version of the Cassandra protocol the driver should use.', required=False, choices=['1', '2', '3', '4', '5'], default='4') parser.add_argument('--solr-rows', help='Number of rows to fetch with each Solr query to build the list of tiles to delete', required=False, dest='rows', default=1000, type=int) return parser.parse_args() if __name__ == "__main__": the_args = parse_args() init(the_args) delete_by_query(the_args)