tools/deletebyquery/deletebyquery.py (199 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import json
import logging
import uuid
from random import sample
from cassandra.auth import PlainTextAuthProvider
import cassandra.concurrent
from cassandra.cluster import Cluster
from cassandra.policies import RoundRobinPolicy, TokenAwarePolicy
from solrcloudpy import SolrConnection, SearchOptions
from six.moves import input
solr_connection = None
solr_collection = None
SOLR_UNIQUE_KEY = None
cassandra_cluster = None
cassandra_session = None
cassandra_table = None
logging.basicConfig()
logging.getLogger().setLevel(logging.INFO)
logging.getLogger().handlers[0].setFormatter(
logging.Formatter(fmt="%(asctime)s %(levelname)s:%(name)s: %(message)s", datefmt="%Y-%m-%dT%H:%M:%S"))
logging.getLogger('cassandra').setLevel(logging.CRITICAL)
def init(args):
global solr_connection
solr_connection = SolrConnection(args.solr)
global solr_collection
solr_collection = solr_connection[args.collection]
global SOLR_UNIQUE_KEY
SOLR_UNIQUE_KEY = args.solrIdField
dc_policy = RoundRobinPolicy()
token_policy = TokenAwarePolicy(dc_policy)
if args.cassandraUsername and args.cassandraPassword:
auth_provider = PlainTextAuthProvider(username=args.cassandraUsername, password=args.cassandraPassword)
else:
auth_provider = None
global cassandra_cluster
cassandra_cluster = Cluster(contact_points=args.cassandra, port=args.cassandraPort,
protocol_version=int(args.cassandraProtocolVersion),
load_balancing_policy=token_policy,
auth_provider=auth_provider)
global cassandra_session
cassandra_session = cassandra_cluster.connect(keyspace=args.cassandraKeyspace)
global cassandra_table
cassandra_table = args.cassandraTable
def delete_by_query(args):
if args.query:
se = SearchOptions()
se.commonparams.q(args.query) \
.fl(SOLR_UNIQUE_KEY) \
.fl('id')
for fq in args.filterquery if args.filterquery is not None else []:
se.commonparams.fq(fq)
query = se
elif args.jsonparams:
se = SearchOptions(**json.loads(args.jsonparams))
se.commonparams.fl(SOLR_UNIQUE_KEY) \
.fl('id')
query = se
else:
raise RuntimeError("either query or jsonparams is required")
query.commonparams.rows(args.rows)
if check_query(query):
logging.info("Collecting tiles ....")
solr_docs = do_solr_query(query)
if confirm_delete(len(solr_docs)):
deleted_ids = do_delete(solr_docs, query)
logging.info("Deleted tile IDs %s" % json.dumps([str(doc_id) for doc_id in deleted_ids], indent=2))
else:
logging.info("Exiting")
return
else:
logging.info("Exiting")
return
def confirm_delete(num_found):
do_continue = input(
"This action will delete %s record(s) from SOLR and Cassandra. Are you sure you want to Continue? y/n: " % num_found)
while do_continue not in ['y', 'n']:
do_continue = input(
"This action will delete %s record(s) from SOLR and Cassandra. Are you sure you want to Continue? y/n: " % num_found)
return do_continue == 'y'
def check_query(query):
solr_response = solr_collection.search(query)
num_found = solr_response.result.response.numFound
if num_found == 0:
logging.info("Query returned 0 results")
return False
do_continue = input("Query found %s matching documents. Continue? [y]/n/(s)ample: " % num_found)
while do_continue not in ['y', 'n', 's', '']:
do_continue = input("Query found %s matching documents. Continue? [y]/n/(s)ample: " % num_found)
if do_continue == 'y' or do_continue == '':
return True
elif do_continue == 'n':
return False
else:
se = SearchOptions()
se.commonparams.q('%s:%s' % (SOLR_UNIQUE_KEY, sample(solr_response.result.response.docs, 1)[0][SOLR_UNIQUE_KEY]))
logging.info(json.dumps(solr_collection.search(se).result.response.docs[0], indent=2))
return check_query(query)
def do_solr_query(query):
doc_ids = []
next_cursor_mark = "*"
query.commonparams.sort('%s asc' % SOLR_UNIQUE_KEY)
while True:
query.commonparams.remove_param('cursorMark')
query.commonparams.add_params(cursorMark=next_cursor_mark)
solr_response = solr_collection.search(query)
try:
result_next_cursor_mark = solr_response.result.nextCursorMark
except AttributeError:
# No Results
return []
if result_next_cursor_mark == next_cursor_mark:
break
else:
next_cursor_mark = solr_response.result.nextCursorMark
doc_ids.extend([uuid.UUID(doc['id']) for doc in solr_response.result.response.docs])
return doc_ids
def do_delete(doc_ids, query):
logging.info("Executing Cassandra delete...")
delete_from_cassandra(doc_ids)
logging.info("Executing Solr delete...")
delete_from_solr(query)
return doc_ids
def delete_from_cassandra(doc_ids):
statement = cassandra_session.prepare("DELETE FROM %s WHERE tile_id=?" % cassandra_table)
results = cassandra.concurrent.execute_concurrent_with_args(cassandra_session, statement,
[(doc_id,) for doc_id in doc_ids])
for (success, result) in results:
if not success:
logging.warning("Could not delete tile %s" % result)
def delete_from_solr(query):
solr_collection.delete(query, commit=False)
solr_collection.commit()
def parse_args():
parser = argparse.ArgumentParser(description='Delete data from NEXUS using a Solr Query',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--solr',
help='The url of the SOLR server.',
required=True,
metavar='127.0.0.1:8983')
parser.add_argument('--collection',
help='The name of the SOLR collection.',
required=False,
default='nexustiles',
metavar='nexustiles')
parser.add_argument('--solrIdField',
help='The name of the unique ID field for this collection.',
required=False,
default='id',
metavar='id')
parser.add_argument('--cassandra',
help='The hostname(s) or IP(s) of the Cassandra server(s).',
required=True,
nargs='+',
metavar=('127.0.0.100', '127.0.0.101'))
parser.add_argument('-k', '--cassandraKeyspace',
help='The Cassandra keyspace.',
default='nexustiles',
required=False,
metavar='nexustiles')
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('-q', '--query',
help='The ''q'' parameter passed to SOLR Search',
metavar='*:*')
group.add_argument('--jsonparams',
help='Full query prameters formatted as JSON')
parser.add_argument('-fq', '--filterquery',
help='The ''fq'' parameter passed to SOLR Search. Only used if --jsonparams is not provided',
required=False,
nargs='+')
parser.add_argument('-t', '--cassandraTable',
help='The name of the cassandra table.',
required=False,
default='sea_surface_temp')
parser.add_argument('-p', '--cassandraPort',
help='The port used to connect to Cassandra.',
required=False,
default='9042')
parser.add_argument('--cassandraUsername',
help='The username used to connect to Cassandra.',
required=False)
parser.add_argument('--cassandraPassword',
help='The password used to connect to Cassandra.',
required=False)
parser.add_argument('-pv', '--cassandraProtocolVersion',
help='The version of the Cassandra protocol the driver should use.',
required=False,
choices=['1', '2', '3', '4', '5'],
default='4')
parser.add_argument('--solr-rows',
help='Number of rows to fetch with each Solr query to build the list of tiles to delete',
required=False,
dest='rows',
default=1000,
type=int)
return parser.parse_args()
if __name__ == "__main__":
the_args = parse_args()
init(the_args)
delete_by_query(the_args)