in stack/scripts/multitenant_migrate.py [0:0]
def run(self):
self.logger.info('Initializing...')
if not self.is_endpoint_available():
exit_on_error('Endpoint is not available, aborting')
if self.start_date is not None:
self.logger.info("Date Provided. Re-index will run from date=[%s]", self.start_date)
try:
if self.full:
# Do full data migration and exit
self.start_fulldata_migration()
self.metrics['full_data_migration_start'] = get_current_time()
self.logger.info("Full Data Migration Started")
is_migrated = False
while not is_migrated:
time.sleep(STATUS_INTERVAL_SECONDS)
is_migrated = self.is_data_migrated()
if is_migrated:
break
self.metrics['full_data_migration_end'] = get_current_time()
self.logger.info("Full Data Migration completed")
self.log_metrics()
self.logger.info("Finished...")
return
if self.init:
# Init the migration system as this is the first migration done on the cluster
self.run_database_setup()
migration_system_updated = self.is_migration_system_updated()
if not migration_system_updated:
self.logger.info('Migration system needs to be updated. Updating migration system..')
self.start_migration_system_update()
while not migration_system_updated:
time.sleep(STATUS_INTERVAL_SECONDS)
migration_system_updated = self.is_migration_system_updated()
if migration_system_updated:
break
index_mapping_updated = self.is_index_mapping_updated()
if not index_mapping_updated:
self.logger.info('Index Mapping needs to be updated. Updating index mapping..')
self.start_index_mapping_migration()
while not index_mapping_updated:
time.sleep(STATUS_INTERVAL_SECONDS)
index_mapping_updated = self.is_index_mapping_updated()
if index_mapping_updated:
break
# Migrate app info
if self.is_appinfo_migrated():
self.logger.info('AppInfo already migrated. Resetting version for re-migration.')
self.reset_appinfo_migration()
time.sleep(STATUS_INTERVAL_SECONDS)
self.start_appinfo_migration()
self.logger.info('AppInfo Migration Started.')
self.metrics['appinfo_migration_start'] = get_current_time()
is_appinfo_migrated = False
while not is_appinfo_migrated:
is_appinfo_migrated = self.is_appinfo_migrated()
time.sleep(STATUS_INTERVAL_SECONDS)
if is_appinfo_migrated:
self.metrics['appinfo_migration_end'] = get_current_time()
break
self.logger.info('AppInfo Migration Ended.')
# De-dup management app
job = self.start_dedup(MANAGEMENT_APP_ID)
self.logger.info('Started management dedup. App=[%s], Job=[%s]', MANAGEMENT_APP_ID, job)
is_running = True
while is_running:
time.sleep(STATUS_INTERVAL_SECONDS)
is_running = self.is_dedup_running(job)
if not is_running:
break
self.logger.info("Finished dedup. App=[%s], Job=[%s]", MANAGEMENT_APP_ID, job)
self.metrics['dedup_end_' + MANAGEMENT_APP_ID] = get_current_time()
# Reindex management app
job = self.start_app_reindex(MANAGEMENT_APP_ID)
self.metrics['reindex_start'] = get_current_time()
self.logger.info('Started management Re-index. Job=[%s]', job)
is_running = True
while is_running:
time.sleep(STATUS_INTERVAL_SECONDS)
is_running = self.is_reindex_running(job)
if not is_running:
break
self.logger.info("Finished management Re-index. Job=[%s]", job)
self.metrics['reindex_end'] = get_current_time()
# Dedup and re-index all of organization's apps
app_ids = self.get_app_ids()
for app_id in app_ids:
# De-dup app
job = self.start_dedup(app_id)
self.logger.info('Started dedup. App=[%s], Job=[%s]', app_id, job)
is_running = True
while is_running:
time.sleep(STATUS_INTERVAL_SECONDS)
is_running = self.is_dedup_running(job)
if not is_running:
break
self.logger.info("Finished dedup. App=[%s], Job=[%s]", app_id, job)
self.metrics['dedup_end_' + app_id] = get_current_time()
# Re-index app
job = self.start_app_reindex(app_id)
self.metrics['reindex_start_' + app_id] = get_current_time()
self.logger.info('Started Re-index. App=[%s], Job=[%s]', app_id, job)
is_running = True
while is_running:
time.sleep(STATUS_INTERVAL_SECONDS)
is_running = self.is_reindex_running(job)
if not is_running:
break
self.logger.info("Finished Re-index. App=[%s], Job=[%s]", app_id, job)
self.metrics['reindex_end_' + app_id] = get_current_time()
self.log_metrics()
self.logger.info("Finished...")
except KeyboardInterrupt:
self.log_metrics()
self.logger.error('Keyboard interrupted migration. Please run again to ensure the migration finished.')