in utils/usergrid-util-python/usergrid_tools/migration/usergrid_data_exporter.py [0:0]
def main():
global config
config = parse_args()
init()
init_logging()
status_map = {}
org_apps = {
}
if len(org_apps) == 0:
source_org_mgmt_url = org_management_url_template.format(org=config.get('org'),
limit=config.get('limit'),
**config.get('source_endpoint'))
print('Retrieving apps from [%s]' % source_org_mgmt_url)
logger.info('Retrieving apps from [%s]' % source_org_mgmt_url)
try:
# list the apps for the SOURCE org
logger.info('GET %s' % source_org_mgmt_url)
r = session_source.get(source_org_mgmt_url)
if r.status_code != 200:
logger.critical(
'Abort processing: Unable to retrieve apps from [%s]: %s' % (source_org_mgmt_url, r.text))
exit()
logger.info(json.dumps(r.text))
org_apps = r.json().get('data')
except Exception as e:
logger.exception('ERROR Retrieving apps from [%s]' % source_org_mgmt_url)
print(traceback.format_exc())
logger.critical('Unable to retrieve apps from [%s] and will exit' % source_org_mgmt_url)
exit()
if _platform == "linux" or _platform == "linux2":
collection_queue = Queue(maxsize=config.get('queue_size_max'))
collection_response_queue = Queue(maxsize=config.get('queue_size_max'))
else:
collection_queue = Queue()
collection_response_queue = Queue()
logger.info('Starting entity_workers...')
status_listener = StatusListener(collection_response_queue, collection_queue)
status_listener.start()
# start the worker processes which will iterate the collections
collection_workers = [EntityExportWorker(collection_queue, collection_response_queue) for x in
xrange(config.get('collection_workers'))]
[w.start() for w in collection_workers]
try:
apps_to_process = config.get('app')
collections_to_process = config.get('collection')
# iterate the apps retrieved from the org
for org_app in sorted(org_apps.keys()):
logger.info('Found SOURCE App: %s' % org_app)
time.sleep(3)
for org_app in sorted(org_apps.keys()):
parts = org_app.split('/')
app = parts[1]
# if apps are specified and the current app is not in the list, skip it
if apps_to_process and len(apps_to_process) > 0 and app not in apps_to_process:
logger.warning('Skipping app [%s] not included in process list [%s]' % (app, apps_to_process))
continue
logger.info('Processing app=[%s]' % app)
status_map[app] = {
'iteration_started': str(datetime.datetime.now()),
'max_created': -1,
'max_modified': -1,
'min_created': 1584946416000,
'min_modified': 1584946416000,
'count': 0,
'bytes': 0,
'collections': {}
}
# get the list of collections from the source org/app
source_app_url = app_url_template.format(org=config.get('org'),
app=app,
**config.get('source_endpoint'))
logger.info('GET %s' % source_app_url)
r_collections = session_source.get(source_app_url)
collection_attempts = 0
# sometimes this call was not working so I put it in a loop to force it...
while r_collections.status_code != 200 and collection_attempts < 5:
collection_attempts += 1
logger.warning('FAILED: GET (%s) [%s] URL: %s' % (r_collections.elapsed, r_collections.status_code,
source_app_url))
time.sleep(DEFAULT_RETRY_SLEEP)
r_collections = session_source.get(source_app_url)
if collection_attempts >= 5:
logger.critical('Unable to get collections at URL %s, skipping app' % source_app_url)
continue
app_response = r_collections.json()
logger.info('App Response: ' + json.dumps(app_response))
app_entities = app_response.get('entities', [])
if len(app_entities) > 0:
app_entity = app_entities[0]
collections = app_entity.get('metadata', {}).get('collections', {})
logger.info('Collection List: %s' % collections)
# iterate the collections which are returned.
for collection_name, collection_data in collections.iteritems():
exclude_collections = config.get('exclude_collection', [])
if exclude_collections is None:
exclude_collections = []
# filter out collections as configured...
if collection_name in ignore_collections \
or (len(collections_to_process) > 0 and collection_name not in collections_to_process) \
or (len(exclude_collections) > 0 and collection_name in exclude_collections) \
or (config.get('migrate') == 'credentials' and collection_name != 'users'):
logger.warning('Skipping collection=[%s]' % collection_name)
continue
logger.info('Publishing app / collection: %s / %s' % (app, collection_name))
collection_queue.put((app, collection_name))
status_map[app]['iteration_finished'] = str(datetime.datetime.now())
logger.info('Finished publishing collections for app [%s] !' % app)
# allow collection workers to finish
wait_for(collection_workers, label='collection_workers', sleep_time=30)
status_listener.terminate()
except KeyboardInterrupt:
logger.warning('Keyboard Interrupt, aborting...')
collection_queue.close()
collection_response_queue.close()
[os.kill(super(EntityExportWorker, p).pid, signal.SIGINT) for p in collection_workers]
os.kill(super(StatusListener, status_listener).pid, signal.SIGINT)
[w.terminate() for w in collection_workers]
status_listener.terminate()
logger.info('entity_workers DONE!')