in stack/scripts/migrate_entity_data.py [0:0]
def run(self):
self.logger.info('Initializing...')
if not self.is_endpoint_available():
exit_on_error('Endpoint is not available, aborting')
if self.start_date is not None:
self.logger.info("Date Provided. Re-index will run from date=[%s]", self.start_date)
try:
self.run_database_setup()
# We need to check and roll the migration system to 1 if not already
migration_system_updated = self.is_migration_system_updated()
if not migration_system_updated:
self.logger.info('Migration system needs to be updated. Updating migration system..')
self.start_migration_system_update()
while not migration_system_updated:
time.sleep(STATUS_INTERVAL_SECONDS)
migration_system_updated = self.is_migration_system_updated()
if migration_system_updated:
break
index_mapping_updated = self.is_index_mapping_updated()
if not index_mapping_updated:
self.logger.info('Index Mapping needs to be updated. Updating index mapping..')
self.start_index_mapping_migration()
while not index_mapping_updated:
time.sleep(STATUS_INTERVAL_SECONDS)
index_mapping_updated = self.is_index_mapping_updated()
if index_mapping_updated:
break
# Run AppInfo migration only when both appinfos and collection entity data have not been migrated
if not self.is_data_migrated():
#Migrate app info
if self.is_appinfo_migrated():
self.logger.info('AppInfo already migrated. Resetting version for re-migration.')
self.reset_appinfo_migration()
time.sleep(STATUS_INTERVAL_SECONDS)
self.start_appinfo_migration()
self.logger.info('AppInfo Migration Started.')
self.metrics['appinfo_migration_start'] = get_current_time()
is_appinfo_migrated = False
while not is_appinfo_migrated:
is_appinfo_migrated = self.is_appinfo_migrated()
time.sleep(STATUS_INTERVAL_SECONDS)
if is_appinfo_migrated:
self.metrics['appinfo_migration_end'] = get_current_time()
break
self.logger.info('AppInfo Migration Ended.')
else:
self.logger.info('Full Data Migration previously ran... skipping AppInfo migration.')
# We need to check and roll index mapping version to 1 if not already there
# Perform system re-index (it will grab date from input if provided)
job = self.start_reindex()
self.metrics['reindex_start'] = get_current_time()
self.logger.info('Started Re-index. Job=[%s]', job)
is_running = True
while is_running:
time.sleep(STATUS_INTERVAL_SECONDS)
is_running = self.is_reindex_running(job)
if not is_running:
break
self.logger.info("Finished Re-index. Job=[%s]", job)
self.metrics['reindex_end'] = get_current_time()
# Only when we do a delta migration do we run the full data migration (includes appinfo and entity data)
if self.delta_migration:
self.logger.info('Delta option provided. Performing full data migration...')
if self.is_data_migrated():
self.reset_data_migration()
time.sleep(STATUS_INTERVAL_SECONDS)
self.is_data_migrated()
# self.start_core_data_migration()
self.start_fulldata_migration()
self.metrics['full_data_migration_start'] = get_current_time()
self.logger.info("Full Data Migration Started")
is_migrated = False
while not is_migrated:
time.sleep(STATUS_INTERVAL_SECONDS)
is_migrated = self.is_data_migrated()
if is_migrated:
break
self.metrics['full_data_migration_end'] = get_current_time()
self.logger.info("Full Data Migration completed")
self.log_metrics()
self.logger.info("Finished...")
except KeyboardInterrupt:
self.log_metrics()
self.logger.error('Keyboard interrupted migration. Please run again to ensure the migration finished.')