def process_collection()

in utils/usergrid-util-python/usergrid_tools/migration/usergrid_data_exporter.py [0:0]


    def process_collection(self, app, collection_name):

        status_map = {
            collection_name: {
                'iteration_started': str(datetime.datetime.now()),
                'max_created': -1,
                'max_modified': -1,
                'min_created': 1584946416000,
                'min_modified': 1584946416000,
                'count': 0,
                'bytes': 0
            }
        }

        # added a flag for using graph vs query/index
        if config.get('graph', False):
            source_collection_url = collection_graph_url_template.format(org=config.get('org'),
                                                                         app=app,
                                                                         collection=collection_name,
                                                                         limit=config.get('limit'),
                                                                         **config.get('source_endpoint'))
        else:
            source_collection_url = collection_query_url_template.format(org=config.get('org'),
                                                                         app=app,
                                                                         collection=collection_name,
                                                                         limit=config.get('limit'),
                                                                         ql="select * %s" % config.get(
                                                                                 'ql'),
                                                                         **config.get('source_endpoint'))
        counter = 0

        # use the UsergridQuery from the Python SDK to iterate the collection
        q = UsergridQueryIterator(source_collection_url,
                                  page_delay=config.get('page_sleep_time'),
                                  sleep_time=config.get('error_retry_sleep'))

        directory = os.path.join(config['export_path'], ECID, config['org'], app)

        if not os.path.exists(directory):
            os.makedirs(directory)

        entity_filename = '_'.join([collection_name, 'entity-data'])
        entity_filename_base = os.path.join(directory, entity_filename)
        entity_file_number = 0
        entity_file_counter = 0
        entity_filename = '%s-%s.txt' % (entity_filename_base, entity_file_number)
        entity_file = open(entity_filename, 'w')

        edge_filename = '_'.join([collection_name, 'edge-data'])
        edge_filename_base = os.path.join(directory, edge_filename)
        edge_file_number = 0
        edge_file_counter = 0
        edge_filename = '%s-%s.txt' % (edge_filename_base, edge_file_number)
        edge_file = open(edge_filename, 'w')

        try:

            for entity in q:
                try:
                    entity_file_counter += 1
                    counter += 1

                    if entity_file_counter > config['entities_per_file']:
                        entity_file.close()
                        entity_file_number += 1
                        entity_file_counter = 0
                        entity_filename = '%s-%s.txt' % (entity_filename_base, entity_file_number)
                        entity_file = open(entity_filename, 'w')

                    entity_file.write('%s\n' % json.dumps(entity))

                    edge_names = get_edge_names(entity)

                    for edge_name in edge_names:
                        if not include_edge(collection_name, edge_name):
                            continue

                        connection_query_url = connection_query_url_template.format(
                                org=config.get('org'),
                                app=app,
                                verb=edge_name,
                                collection=collection_name,
                                uuid=entity.get('uuid'),
                                limit=config.get('limit'),
                                **config.get('source_endpoint'))

                        connection_query = UsergridQueryIterator(connection_query_url,
                                                                 sleep_time=config.get('error_retry_sleep'))

                        target_uuids = []

                        try:
                            for target_entity in connection_query:
                                target_uuids.append(target_entity.get('uuid'))
                        except:
                            logger.exception('Error processing edge [%s] of entity [ %s / %s / %s]' % (
                                edge_name, app, collection_name, entity.get('uuid')))

                        if len(target_uuids) > 0:
                            edge_file_counter += 1

                            edges = {
                                'entity': {
                                    'type': entity.get('type'),
                                    'uuid': entity.get('uuid')
                                },
                                'edge_name': edge_name,
                                'target_uuids': target_uuids
                            }

                            if entity_file_counter > config['entities_per_file']:
                                edge_file.close()
                                edge_file_number += 1
                                edge_file_counter = 0
                                edge_filename = '%s-%s.txt' % (edge_filename_base, edge_file_number)
                                edge_file = open(edge_filename, 'w')

                            edge_file.write('%s\n' % json.dumps(edges))

                    if 'created' in entity:

                        try:
                            entity_created = long(entity.get('created'))

                            if entity_created > status_map[collection_name]['max_created']:
                                status_map[collection_name]['max_created'] = entity_created
                                status_map[collection_name]['max_created_str'] = str(
                                        datetime.datetime.fromtimestamp(entity_created / 1000))

                            if entity_created < status_map[collection_name]['min_created']:
                                status_map[collection_name]['min_created'] = entity_created
                                status_map[collection_name]['min_created_str'] = str(
                                        datetime.datetime.fromtimestamp(entity_created / 1000))

                        except ValueError:
                            pass

                    if 'modified' in entity:

                        try:
                            entity_modified = long(entity.get('modified'))

                            if entity_modified > status_map[collection_name]['max_modified']:
                                status_map[collection_name]['max_modified'] = entity_modified
                                status_map[collection_name]['max_modified_str'] = str(
                                        datetime.datetime.fromtimestamp(entity_modified / 1000))

                            if entity_modified < status_map[collection_name]['min_modified']:
                                status_map[collection_name]['min_modified'] = entity_modified
                                status_map[collection_name]['min_modified_str'] = str(
                                        datetime.datetime.fromtimestamp(entity_modified / 1000))

                        except ValueError:
                            pass

                    status_map[collection_name]['bytes'] += count_bytes(entity)
                    status_map[collection_name]['count'] += 1

                    if counter % 1000 == 1:
                        try:
                            collection_worker_logger.warning(
                                    'Sending incremental stats for app/collection [%s / %s]: %s' % (
                                        app, collection_name, status_map))

                            self.response_queue.put((app, collection_name, status_map))

                            if QSIZE_OK:
                                collection_worker_logger.info(
                                        'Counter=%s, collection queue depth=%s' % (
                                            counter, self.work_queue.qsize()))
                        except:
                            pass

                        collection_worker_logger.warn(
                                'Current status of collections processed: %s' % json.dumps(status_map))
                except KeyboardInterrupt:
                    raise

                except:
                    logger.exception(
                            'Error processing entity %s / %s / %s' % (app, collection_name, entity.get('uuid')))

        except KeyboardInterrupt:
            raise

        except:
            logger.exception('Error processing collection %s / %s ' % (app, collection_name))

        finally:
            if edge_file is not None:
                edge_file.close()

            if entity_file is not None:
                entity_file.close()

        return status_map