def run()

in utils/usergrid-util-python/usergrid_tools/migration/usergrid_data_migrator.py [0:0]


    def run(self):

        collection_worker_logger.info('starting run()...')
        keep_going = True

        counter = 0
        # max_created = 0
        empty_count = 0
        app = 'ERROR'
        collection_name = 'NOT SET'
        status_map = {}
        sleep_time = 10

        try:

            while keep_going:

                try:
                    app, collection_name = self.work_queue.get(timeout=30)

                    status_map = {
                        collection_name: {
                            'iteration_started': str(datetime.datetime.now()),
                            'max_created': -1,
                            'max_modified': -1,
                            'min_created': 1584946416000,
                            'min_modified': 1584946416000,
                            'count': 0,
                            'bytes': 0
                        }
                    }

                    empty_count = 0

                    # added a flag for using graph vs query/index
                    if config.get('graph', False):
                        source_collection_url = collection_graph_url_template.format(org=config.get('org'),
                                                                                     app=app,
                                                                                     collection=collection_name,
                                                                                     limit=config.get('limit'),
                                                                                     **config.get('source_endpoint'))
                    else:
                        source_collection_url = collection_query_url_template.format(org=config.get('org'),
                                                                                     app=app,
                                                                                     collection=collection_name,
                                                                                     limit=config.get('limit'),
                                                                                     ql="select * %s" % config.get(
                                                                                             'ql'),
                                                                                     **config.get('source_endpoint'))

                    logger.info('Iterating URL: %s' % source_collection_url)

                    # use the UsergridQuery from the Python SDK to iterate the collection
                    q = UsergridQueryIterator(source_collection_url,
                                              page_delay=config.get('page_sleep_time'),
                                              sleep_time=config.get('error_retry_sleep'))

                    for entity in q:

                        # begin entity loop

                        self.entity_queue.put((app, collection_name, entity))
                        counter += 1

                        if 'created' in entity:

                            try:
                                entity_created = long(entity.get('created'))

                                if entity_created > status_map[collection_name]['max_created']:
                                    status_map[collection_name]['max_created'] = entity_created
                                    status_map[collection_name]['max_created_str'] = str(
                                            datetime.datetime.fromtimestamp(entity_created / 1000))

                                if entity_created < status_map[collection_name]['min_created']:
                                    status_map[collection_name]['min_created'] = entity_created
                                    status_map[collection_name]['min_created_str'] = str(
                                            datetime.datetime.fromtimestamp(entity_created / 1000))

                            except ValueError:
                                pass

                        if 'modified' in entity:

                            try:
                                entity_modified = long(entity.get('modified'))

                                if entity_modified > status_map[collection_name]['max_modified']:
                                    status_map[collection_name]['max_modified'] = entity_modified
                                    status_map[collection_name]['max_modified_str'] = str(
                                            datetime.datetime.fromtimestamp(entity_modified / 1000))

                                if entity_modified < status_map[collection_name]['min_modified']:
                                    status_map[collection_name]['min_modified'] = entity_modified
                                    status_map[collection_name]['min_modified_str'] = str(
                                            datetime.datetime.fromtimestamp(entity_modified / 1000))

                            except ValueError:
                                pass

                        status_map[collection_name]['bytes'] += count_bytes(entity)
                        status_map[collection_name]['count'] += 1

                        if counter % 1000 == 1:
                            try:
                                collection_worker_logger.warning(
                                        'Sending stats for app/collection [%s / %s]: %s' % (
                                            app, collection_name, status_map))

                                self.response_queue.put((app, collection_name, status_map))

                                if QSIZE_OK:
                                    collection_worker_logger.info(
                                            'Counter=%s, collection queue depth=%s' % (
                                                counter, self.work_queue.qsize()))
                            except:
                                pass

                            collection_worker_logger.warn(
                                    'Current status of collections processed: %s' % json.dumps(status_map))

                        if config.get('entity_sleep_time') > 0:
                            collection_worker_logger.debug(
                                    'sleeping for [%s]s per entity...' % (config.get('entity_sleep_time')))
                            time.sleep(config.get('entity_sleep_time'))
                            collection_worker_logger.debug(
                                    'STOPPED sleeping for [%s]s per entity...' % (config.get('entity_sleep_time')))

                    # end entity loop

                    status_map[collection_name]['iteration_finished'] = str(datetime.datetime.now())

                    collection_worker_logger.warning(
                            'Collection [%s / %s / %s] loop complete!  Max Created entity %s' % (
                                config.get('org'), app, collection_name, status_map[collection_name]['max_created']))

                    collection_worker_logger.warning(
                            'Sending FINAL stats for app/collection [%s / %s]: %s' % (app, collection_name, status_map))

                    self.response_queue.put((app, collection_name, status_map))

                    collection_worker_logger.info('Done! Finished app/collection: %s / %s' % (app, collection_name))

                except KeyboardInterrupt, e:
                    raise e

                except Empty:
                    collection_worker_logger.warning('EMPTY! Count=%s' % empty_count)

                    empty_count += 1

                    if empty_count >= 2:
                        keep_going = False

                except Exception as e:
                    logger.exception('Error in CollectionWorker processing collection [%s]' % collection_name)
                    print traceback.format_exc()

        finally:
            self.response_queue.put((app, collection_name, status_map))
            collection_worker_logger.info('FINISHED!')