in utils/usergrid-util-python/usergrid_tools/migration/usergrid_data_exporter.py [0:0]
def process_collection(self, app, collection_name):
status_map = {
collection_name: {
'iteration_started': str(datetime.datetime.now()),
'max_created': -1,
'max_modified': -1,
'min_created': 1584946416000,
'min_modified': 1584946416000,
'count': 0,
'bytes': 0
}
}
# added a flag for using graph vs query/index
if config.get('graph', False):
source_collection_url = collection_graph_url_template.format(org=config.get('org'),
app=app,
collection=collection_name,
limit=config.get('limit'),
**config.get('source_endpoint'))
else:
source_collection_url = collection_query_url_template.format(org=config.get('org'),
app=app,
collection=collection_name,
limit=config.get('limit'),
ql="select * %s" % config.get(
'ql'),
**config.get('source_endpoint'))
counter = 0
# use the UsergridQuery from the Python SDK to iterate the collection
q = UsergridQueryIterator(source_collection_url,
page_delay=config.get('page_sleep_time'),
sleep_time=config.get('error_retry_sleep'))
directory = os.path.join(config['export_path'], ECID, config['org'], app)
if not os.path.exists(directory):
os.makedirs(directory)
entity_filename = '_'.join([collection_name, 'entity-data'])
entity_filename_base = os.path.join(directory, entity_filename)
entity_file_number = 0
entity_file_counter = 0
entity_filename = '%s-%s.txt' % (entity_filename_base, entity_file_number)
entity_file = open(entity_filename, 'w')
edge_filename = '_'.join([collection_name, 'edge-data'])
edge_filename_base = os.path.join(directory, edge_filename)
edge_file_number = 0
edge_file_counter = 0
edge_filename = '%s-%s.txt' % (edge_filename_base, edge_file_number)
edge_file = open(edge_filename, 'w')
try:
for entity in q:
try:
entity_file_counter += 1
counter += 1
if entity_file_counter > config['entities_per_file']:
entity_file.close()
entity_file_number += 1
entity_file_counter = 0
entity_filename = '%s-%s.txt' % (entity_filename_base, entity_file_number)
entity_file = open(entity_filename, 'w')
entity_file.write('%s\n' % json.dumps(entity))
edge_names = get_edge_names(entity)
for edge_name in edge_names:
if not include_edge(collection_name, edge_name):
continue
connection_query_url = connection_query_url_template.format(
org=config.get('org'),
app=app,
verb=edge_name,
collection=collection_name,
uuid=entity.get('uuid'),
limit=config.get('limit'),
**config.get('source_endpoint'))
connection_query = UsergridQueryIterator(connection_query_url,
sleep_time=config.get('error_retry_sleep'))
target_uuids = []
try:
for target_entity in connection_query:
target_uuids.append(target_entity.get('uuid'))
except:
logger.exception('Error processing edge [%s] of entity [ %s / %s / %s]' % (
edge_name, app, collection_name, entity.get('uuid')))
if len(target_uuids) > 0:
edge_file_counter += 1
edges = {
'entity': {
'type': entity.get('type'),
'uuid': entity.get('uuid')
},
'edge_name': edge_name,
'target_uuids': target_uuids
}
if entity_file_counter > config['entities_per_file']:
edge_file.close()
edge_file_number += 1
edge_file_counter = 0
edge_filename = '%s-%s.txt' % (edge_filename_base, edge_file_number)
edge_file = open(edge_filename, 'w')
edge_file.write('%s\n' % json.dumps(edges))
if 'created' in entity:
try:
entity_created = long(entity.get('created'))
if entity_created > status_map[collection_name]['max_created']:
status_map[collection_name]['max_created'] = entity_created
status_map[collection_name]['max_created_str'] = str(
datetime.datetime.fromtimestamp(entity_created / 1000))
if entity_created < status_map[collection_name]['min_created']:
status_map[collection_name]['min_created'] = entity_created
status_map[collection_name]['min_created_str'] = str(
datetime.datetime.fromtimestamp(entity_created / 1000))
except ValueError:
pass
if 'modified' in entity:
try:
entity_modified = long(entity.get('modified'))
if entity_modified > status_map[collection_name]['max_modified']:
status_map[collection_name]['max_modified'] = entity_modified
status_map[collection_name]['max_modified_str'] = str(
datetime.datetime.fromtimestamp(entity_modified / 1000))
if entity_modified < status_map[collection_name]['min_modified']:
status_map[collection_name]['min_modified'] = entity_modified
status_map[collection_name]['min_modified_str'] = str(
datetime.datetime.fromtimestamp(entity_modified / 1000))
except ValueError:
pass
status_map[collection_name]['bytes'] += count_bytes(entity)
status_map[collection_name]['count'] += 1
if counter % 1000 == 1:
try:
collection_worker_logger.warning(
'Sending incremental stats for app/collection [%s / %s]: %s' % (
app, collection_name, status_map))
self.response_queue.put((app, collection_name, status_map))
if QSIZE_OK:
collection_worker_logger.info(
'Counter=%s, collection queue depth=%s' % (
counter, self.work_queue.qsize()))
except:
pass
collection_worker_logger.warn(
'Current status of collections processed: %s' % json.dumps(status_map))
except KeyboardInterrupt:
raise
except:
logger.exception(
'Error processing entity %s / %s / %s' % (app, collection_name, entity.get('uuid')))
except KeyboardInterrupt:
raise
except:
logger.exception('Error processing collection %s / %s ' % (app, collection_name))
finally:
if edge_file is not None:
edge_file.close()
if entity_file is not None:
entity_file.close()
return status_map