in utils/usergrid-util-python/usergrid_tools/migration/usergrid_data_migrator.py [0:0]
def migrate_data(app, collection_name, source_entity, attempts=0, force=False):
if config.get('skip_data') and not force:
return True
# check the cache to see if this entity has changed
if not config.get('skip_cache_read', False) and not force:
try:
str_modified = cache.get(source_entity.get('uuid'))
if str_modified not in [None, 'None']:
modified = long(str_modified)
logger.debug('FOUND CACHE: %s = %s ' % (source_entity.get('uuid'), modified))
if modified <= source_entity.get('modified'):
modified_date = datetime.datetime.utcfromtimestamp(modified / 1000)
e_uuid = source_entity.get('uuid')
uuid_datetime = time_uuid.TimeUUID(e_uuid).get_datetime()
logger.debug('Skipping ENTITY: %s / %s / %s / %s (%s) / %s (%s)' % (
config.get('org'), app, collection_name, e_uuid, uuid_datetime, modified, modified_date))
return True
else:
logger.debug('DELETING CACHE: %s ' % (source_entity.get('uuid')))
cache.delete(source_entity.get('uuid'))
except:
logger.error('Error on checking cache for uuid=[%s]' % source_entity.get('uuid'))
logger.error(traceback.format_exc())
if exclude_collection(collection_name):
logger.warn('Excluding entity in filtered collection [%s]' % collection_name)
return True
# handle duplicate user case
if collection_name in ['users', 'user']:
source_entity = confirm_user_entity(app, source_entity)
source_identifier = get_source_identifier(source_entity)
logger.info('Visiting ENTITY data [%s / %s (%s) ] at %s' % (
collection_name, source_identifier, get_uuid_time(source_entity.get('uuid')), str(datetime.datetime.utcnow())))
entity_copy = source_entity.copy()
if 'metadata' in entity_copy:
entity_copy.pop('metadata')
target_app, target_collection, target_org = get_target_mapping(app, collection_name)
try:
target_entity_url_by_name = put_entity_url_template.format(org=target_org,
app=target_app,
collection=target_collection,
uuid=source_identifier,
**config.get('target_endpoint'))
r = session_target.put(url=target_entity_url_by_name, data=json.dumps(entity_copy))
if attempts > 1:
logger.warn('Attempt [%s] to migrate entity [%s / %s] at URL [%s]' % (
attempts, collection_name, source_identifier, target_entity_url_by_name))
else:
logger.debug('Attempt [%s] to migrate entity [%s / %s] at URL [%s]' % (
attempts, collection_name, source_identifier, target_entity_url_by_name))
if r.status_code == 200:
# Worked => WE ARE DONE
logger.info(
'migrate_data | success=[%s] | attempts=[%s] | entity=[%s / %s / %s] | created=[%s] | modified=[%s]' % (
True, attempts, config.get('org'), app, source_identifier, source_entity.get('created'),
source_entity.get('modified'),))
if not config.get('skip_cache_write', False):
logger.debug('SETTING CACHE | uuid=[%s] | modified=[%s]' % (
source_entity.get('uuid'), str(source_entity.get('modified'))))
cache.set(source_entity.get('uuid'), str(source_entity.get('modified')))
if collection_name in ['role', 'group', 'roles', 'groups']:
migrate_permissions(app, collection_name, source_entity, attempts=0)
if collection_name in ['users', 'user']:
migrate_user_credentials(app, collection_name, source_entity, attempts=0)
return True
else:
logger.error('Failure [%s] on attempt [%s] to PUT url=[%s], entity=[%s] response=[%s]' % (
r.status_code, attempts, target_entity_url_by_name, json.dumps(source_entity), r.text))
if attempts >= 5:
logger.critical(
'ABORT migrate_data | success=[%s] | attempts=[%s] | created=[%s] | modified=[%s] %s / %s / %s' % (
True, attempts, source_entity.get('created'), source_entity.get('modified'), app,
collection_name, source_identifier))
return False
if r.status_code == 400:
if target_collection in ['roles', 'role']:
return repair_user_role(app, collection_name, source_entity)
elif target_collection in ['users', 'user']:
return handle_user_migration_conflict(app, collection_name, source_entity)
elif 'duplicate_unique_property_exists' in r.text:
logger.error(
'WILL NOT RETRY (duplicate) [%s] attempts to PUT url=[%s], entity=[%s] response=[%s]' % (
attempts, target_entity_url_by_name, json.dumps(source_entity), r.text))
return False
elif r.status_code == 403:
logger.critical(
'ABORT migrate_data | success=[%s] | attempts=[%s] | created=[%s] | modified=[%s] %s / %s / %s' % (
False, attempts, source_entity.get('created'), source_entity.get('modified'), app,
collection_name, source_identifier))
return False
except:
logger.error(traceback.format_exc())
logger.error('error in migrate_data on entity: %s' % json.dumps(source_entity))
logger.warn(
'UNSUCCESSFUL migrate_data | success=[%s] | attempts=[%s] | entity=[%s / %s / %s] | created=[%s] | modified=[%s]' % (
True, attempts, config.get('org'), app, source_identifier, source_entity.get('created'),
source_entity.get('modified'),))
return migrate_data(app, collection_name, source_entity, attempts=attempts + 1)