in utils/usergrid-util-python/usergrid_tools/migration/usergrid_data_migrator.py [0:0]
def run(self):
collection_worker_logger.info('starting run()...')
keep_going = True
counter = 0
# max_created = 0
empty_count = 0
app = 'ERROR'
collection_name = 'NOT SET'
status_map = {}
sleep_time = 10
try:
while keep_going:
try:
app, collection_name = self.work_queue.get(timeout=30)
status_map = {
collection_name: {
'iteration_started': str(datetime.datetime.now()),
'max_created': -1,
'max_modified': -1,
'min_created': 1584946416000,
'min_modified': 1584946416000,
'count': 0,
'bytes': 0
}
}
empty_count = 0
# added a flag for using graph vs query/index
if config.get('graph', False):
source_collection_url = collection_graph_url_template.format(org=config.get('org'),
app=app,
collection=collection_name,
limit=config.get('limit'),
**config.get('source_endpoint'))
else:
source_collection_url = collection_query_url_template.format(org=config.get('org'),
app=app,
collection=collection_name,
limit=config.get('limit'),
ql="select * %s" % config.get(
'ql'),
**config.get('source_endpoint'))
logger.info('Iterating URL: %s' % source_collection_url)
# use the UsergridQuery from the Python SDK to iterate the collection
q = UsergridQueryIterator(source_collection_url,
page_delay=config.get('page_sleep_time'),
sleep_time=config.get('error_retry_sleep'))
for entity in q:
# begin entity loop
self.entity_queue.put((app, collection_name, entity))
counter += 1
if 'created' in entity:
try:
entity_created = long(entity.get('created'))
if entity_created > status_map[collection_name]['max_created']:
status_map[collection_name]['max_created'] = entity_created
status_map[collection_name]['max_created_str'] = str(
datetime.datetime.fromtimestamp(entity_created / 1000))
if entity_created < status_map[collection_name]['min_created']:
status_map[collection_name]['min_created'] = entity_created
status_map[collection_name]['min_created_str'] = str(
datetime.datetime.fromtimestamp(entity_created / 1000))
except ValueError:
pass
if 'modified' in entity:
try:
entity_modified = long(entity.get('modified'))
if entity_modified > status_map[collection_name]['max_modified']:
status_map[collection_name]['max_modified'] = entity_modified
status_map[collection_name]['max_modified_str'] = str(
datetime.datetime.fromtimestamp(entity_modified / 1000))
if entity_modified < status_map[collection_name]['min_modified']:
status_map[collection_name]['min_modified'] = entity_modified
status_map[collection_name]['min_modified_str'] = str(
datetime.datetime.fromtimestamp(entity_modified / 1000))
except ValueError:
pass
status_map[collection_name]['bytes'] += count_bytes(entity)
status_map[collection_name]['count'] += 1
if counter % 1000 == 1:
try:
collection_worker_logger.warning(
'Sending stats for app/collection [%s / %s]: %s' % (
app, collection_name, status_map))
self.response_queue.put((app, collection_name, status_map))
if QSIZE_OK:
collection_worker_logger.info(
'Counter=%s, collection queue depth=%s' % (
counter, self.work_queue.qsize()))
except:
pass
collection_worker_logger.warn(
'Current status of collections processed: %s' % json.dumps(status_map))
if config.get('entity_sleep_time') > 0:
collection_worker_logger.debug(
'sleeping for [%s]s per entity...' % (config.get('entity_sleep_time')))
time.sleep(config.get('entity_sleep_time'))
collection_worker_logger.debug(
'STOPPED sleeping for [%s]s per entity...' % (config.get('entity_sleep_time')))
# end entity loop
status_map[collection_name]['iteration_finished'] = str(datetime.datetime.now())
collection_worker_logger.warning(
'Collection [%s / %s / %s] loop complete! Max Created entity %s' % (
config.get('org'), app, collection_name, status_map[collection_name]['max_created']))
collection_worker_logger.warning(
'Sending FINAL stats for app/collection [%s / %s]: %s' % (app, collection_name, status_map))
self.response_queue.put((app, collection_name, status_map))
collection_worker_logger.info('Done! Finished app/collection: %s / %s' % (app, collection_name))
except KeyboardInterrupt, e:
raise e
except Empty:
collection_worker_logger.warning('EMPTY! Count=%s' % empty_count)
empty_count += 1
if empty_count >= 2:
keep_going = False
except Exception as e:
logger.exception('Error in CollectionWorker processing collection [%s]' % collection_name)
print traceback.format_exc()
finally:
self.response_queue.put((app, collection_name, status_map))
collection_worker_logger.info('FINISHED!')