def do_operation()

in utils/usergrid-util-python/usergrid_tools/migration/usergrid_data_migrator.py [0:0]


def do_operation(apps_and_collections, operation):
    status_map = {}

    logger.info('Creating queues...')

    # Mac, for example, does not support the max_size for a queue in Python
    if _platform == "linux" or _platform == "linux2":
        entity_queue = Queue(maxsize=config.get('queue_size_max'))
        collection_queue = Queue(maxsize=config.get('queue_size_max'))
        collection_response_queue = Queue(maxsize=config.get('queue_size_max'))
    else:
        entity_queue = Queue()
        collection_queue = Queue()
        collection_response_queue = Queue()

    logger.info('Starting entity_workers...')

    collection_count = 0
    # create the entity workers, but only start them (later) if there is work to do
    entity_workers = [EntityWorker(entity_queue, operation) for x in xrange(config.get('entity_workers'))]

    # create the collection workers, but only start them (later) if there is work to do
    collection_workers = [CollectionWorker(collection_queue, entity_queue, collection_response_queue) for x in
                          xrange(config.get('collection_workers'))]

    status_listener = StatusListener(collection_response_queue, entity_queue)

    try:
        # for each app, publish the (app_name, collection_name) to the queue.
        # this is received by a collection worker who iterates the collection and publishes
        # entities into a queue.  These are received by an individual entity worker which
        # executes the specified operation on the entity

        for app, app_data in apps_and_collections.get('apps', {}).iteritems():
            logger.info('Processing app=[%s]' % app)

            status_map[app] = {
                'iteration_started': str(datetime.datetime.now()),
                'max_created': -1,
                'max_modified': -1,
                'min_created': 1584946416000,
                'min_modified': 1584946416000,
                'count': 0,
                'bytes': 0,
                'collections': {}
            }

            # iterate the collections which are returned.
            for collection_name in app_data.get('collections'):
                logger.info('Publishing app / collection: %s / %s' % (app, collection_name))

                collection_count += 1
                collection_queue.put((app, collection_name))

            logger.info('Finished publishing [%s] collections for app [%s] !' % (collection_count, app))

        # only start the threads if there is work to do
        if collection_count > 0:
            status_listener.start()

            # start the worker processes which will iterate the collections
            [w.start() for w in collection_workers]

            # start the worker processes which will do the work of migrating
            [w.start() for w in entity_workers]

            # allow collection workers to finish
            wait_for(collection_workers, label='collection_workers', sleep_time=60)

            # allow entity workers to finish
            wait_for(entity_workers, label='entity_workers', sleep_time=60)

            status_listener.terminate()

    except KeyboardInterrupt:
        logger.warning('Keyboard Interrupt, aborting...')
        entity_queue.close()
        collection_queue.close()
        collection_response_queue.close()

        [os.kill(super(EntityWorker, p).pid, signal.SIGINT) for p in entity_workers]
        [os.kill(super(CollectionWorker, p).pid, signal.SIGINT) for p in collection_workers]
        os.kill(super(StatusListener, status_listener).pid, signal.SIGINT)

        [w.terminate() for w in entity_workers]
        [w.terminate() for w in collection_workers]
        status_listener.terminate()

    logger.info('entity_workers DONE!')