in utils/usergrid-util-python/usergrid_tools/migration/usergrid_data_migrator.py [0:0]
def do_operation(apps_and_collections, operation):
status_map = {}
logger.info('Creating queues...')
# Mac, for example, does not support the max_size for a queue in Python
if _platform == "linux" or _platform == "linux2":
entity_queue = Queue(maxsize=config.get('queue_size_max'))
collection_queue = Queue(maxsize=config.get('queue_size_max'))
collection_response_queue = Queue(maxsize=config.get('queue_size_max'))
else:
entity_queue = Queue()
collection_queue = Queue()
collection_response_queue = Queue()
logger.info('Starting entity_workers...')
collection_count = 0
# create the entity workers, but only start them (later) if there is work to do
entity_workers = [EntityWorker(entity_queue, operation) for x in xrange(config.get('entity_workers'))]
# create the collection workers, but only start them (later) if there is work to do
collection_workers = [CollectionWorker(collection_queue, entity_queue, collection_response_queue) for x in
xrange(config.get('collection_workers'))]
status_listener = StatusListener(collection_response_queue, entity_queue)
try:
# for each app, publish the (app_name, collection_name) to the queue.
# this is received by a collection worker who iterates the collection and publishes
# entities into a queue. These are received by an individual entity worker which
# executes the specified operation on the entity
for app, app_data in apps_and_collections.get('apps', {}).iteritems():
logger.info('Processing app=[%s]' % app)
status_map[app] = {
'iteration_started': str(datetime.datetime.now()),
'max_created': -1,
'max_modified': -1,
'min_created': 1584946416000,
'min_modified': 1584946416000,
'count': 0,
'bytes': 0,
'collections': {}
}
# iterate the collections which are returned.
for collection_name in app_data.get('collections'):
logger.info('Publishing app / collection: %s / %s' % (app, collection_name))
collection_count += 1
collection_queue.put((app, collection_name))
logger.info('Finished publishing [%s] collections for app [%s] !' % (collection_count, app))
# only start the threads if there is work to do
if collection_count > 0:
status_listener.start()
# start the worker processes which will iterate the collections
[w.start() for w in collection_workers]
# start the worker processes which will do the work of migrating
[w.start() for w in entity_workers]
# allow collection workers to finish
wait_for(collection_workers, label='collection_workers', sleep_time=60)
# allow entity workers to finish
wait_for(entity_workers, label='entity_workers', sleep_time=60)
status_listener.terminate()
except KeyboardInterrupt:
logger.warning('Keyboard Interrupt, aborting...')
entity_queue.close()
collection_queue.close()
collection_response_queue.close()
[os.kill(super(EntityWorker, p).pid, signal.SIGINT) for p in entity_workers]
[os.kill(super(CollectionWorker, p).pid, signal.SIGINT) for p in collection_workers]
os.kill(super(StatusListener, status_listener).pid, signal.SIGINT)
[w.terminate() for w in entity_workers]
[w.terminate() for w in collection_workers]
status_listener.terminate()
logger.info('entity_workers DONE!')