stack/scripts/multitenant_migrate.py (473 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # # # To migrate multiple tenants within one cluster. # # STEP 1 - SETUP TENANT ONE TOMCAT RUNNING 2.1 NOT IN SERVICE AND INIT MIGRATION # # Login to the Tomcat instance and run this command, specifying both superuser and tenant organization: # # python multitenant_migrate.py --org <org1name> --super <user>:<pass> --init # # This command will setup the database, setup the migration system and update index mappings: # - /system/database/setup # - /system/migrate/run/migration-system # - /system/migrate/run/index_mapping_migration # # Then it will migrate appinfos, re-index the management app and then for each of the specified org's apps # it will de-dup connections and re-index the app. # # Write down the 'Re-index start' timestamp when this is finished. # # STEP 2 - PUT TENANT ONE TOMCATS IN SERVICE AND DO DELTA MIGRATION # # On the same Tomcat instance and run this command with the --date timestamp you noted in the previous step: # # python multitenant_migrate.py --org <org1name> --super <user>:<pass> --date <timestamp> # # Then it will migrate appinfos, re-index the management app and then for each of the specified org's apps # it will de-dup connections and re-index the app with a start-date specified so only data modified since # STEP 1 will be re-indexed. # # STEP 3 - SETUP TENANT TWO TOMCAT RUNNING 2.1 NOT IN SERVICE # # Login to the Tomcat instance and run this command, specifying both superuser and tenant organization: # # python multitenant_migrate.py --org <org2name> --super <user>:<pass> # # This command will migrate appinfos, re-index the management app and then for each of the specified org's apps # it will de-dup connections and re-index the app. # # Write down the 'Re-index start' timestamp when this is finished. # STEP 4 - PUT TENANT TWO TOMCATS IN SERVICE AND DO DELTA MIGRATION # # On the same Tomcat instance and run this command with the --date timestamp you noted in the previous step: # # python multitenant_migrate.py --org <org2name> --super <user>:<pass> --date <timestamp> # # Then it will migrate appinfos, re-index the management app and then for each of the specified org's apps # it will de-dup connections and re-index the app with a start-date specified so only data modified since # STEP 1 will be re-indexed. # # STEP 5 - FULL DATA MIGRATION (migrates entity data to new format) # # Login to any Tomcat instance in the cluster and run this command: # # python migrate_entity_data.py --super <user>:<pass> --full # # This command will run the full data migration. # import sys import logging from logging.handlers import RotatingFileHandler import argparse import time import requests import json # Version expected in status response post-migration for entity and app-info data TARGET_APPINFO_VERSION=2 TARGET_ENTITY_DATA_VERSION=2 TARGET_CORE_DATA_VERSION=2 TARGET_MIGRATION_SYSTEM_VERSION = 1 TARGET_INDEX_MAPPING_VERSION = 2 # Set an interval (in seconds) for checking if re-index and/or migration has finished STATUS_INTERVAL_SECONDS = 2 # Set plugin names PLUGIN_MIGRATION_SYSTEM = 'migration-system' PLUGIN_APPINFO = 'appinfo-migration' PLUGIN_ENTITYDATA = 'collections-entity-data' PLUGIN_INDEX_MAPPING = 'index_mapping_migration' PLUGIN_CORE_DATA = 'core-data' MANAGEMENT_APP_ID = 'b6768a08-b5d5-11e3-a495-11ddb1de66c8' def parse_args(): parser = argparse.ArgumentParser(description='Usergrid Migration Tool') parser.add_argument('--endpoint', help='The endpoint to use for making API requests.', type=str, default='http://localhost:8080') parser.add_argument('--super', help='Superuser username and creds <user:pass>', type=str, required=True) parser.add_argument('--init', help='Init system and start first migration.', action='store_true', default=False) parser.add_argument('--org', help='Name of organization on which to run migration.', type=str, required=False) parser.add_argument('--date', help='A date from which to start the migration', type=str) parser.add_argument('--full', help='Run full data migration (last step in cluster migration).', action='store_true', default=False) my_args = parser.parse_args(sys.argv[1:]) arg_vars = vars(my_args) creds = arg_vars['super'].split(':') if len(creds) != 2: print('Superuser credentials not properly specified. Must be "-u <user:pass>". Exiting...') exit_on_error() else: arg_vars['superuser'] = creds[0] arg_vars['superpass'] = creds[1] return arg_vars class Migrate: def __init__(self): self.args = parse_args() self.start_date = self.args['date'] self.endpoint = self.args['endpoint'] self.metrics = {'reindex_start': '', 'reindex_end': '', 'appinfo_migration_start': '', 'appinfo_migration_end': '', 'full_data_migration_start': '', 'full_data_migration_end': ''} self.logger = init_logging(self.__class__.__name__) self.super_user = self.args['superuser'] self.super_pass = self.args['superpass'] self.org = self.args['org'] self.init = self.args['init'] self.full = self.args['full'] def run(self): self.logger.info('Initializing...') if not self.is_endpoint_available(): exit_on_error('Endpoint is not available, aborting') if self.start_date is not None: self.logger.info("Date Provided. Re-index will run from date=[%s]", self.start_date) try: if self.full: # Do full data migration and exit self.start_fulldata_migration() self.metrics['full_data_migration_start'] = get_current_time() self.logger.info("Full Data Migration Started") is_migrated = False while not is_migrated: time.sleep(STATUS_INTERVAL_SECONDS) is_migrated = self.is_data_migrated() if is_migrated: break self.metrics['full_data_migration_end'] = get_current_time() self.logger.info("Full Data Migration completed") self.log_metrics() self.logger.info("Finished...") return if self.init: # Init the migration system as this is the first migration done on the cluster self.run_database_setup() migration_system_updated = self.is_migration_system_updated() if not migration_system_updated: self.logger.info('Migration system needs to be updated. Updating migration system..') self.start_migration_system_update() while not migration_system_updated: time.sleep(STATUS_INTERVAL_SECONDS) migration_system_updated = self.is_migration_system_updated() if migration_system_updated: break index_mapping_updated = self.is_index_mapping_updated() if not index_mapping_updated: self.logger.info('Index Mapping needs to be updated. Updating index mapping..') self.start_index_mapping_migration() while not index_mapping_updated: time.sleep(STATUS_INTERVAL_SECONDS) index_mapping_updated = self.is_index_mapping_updated() if index_mapping_updated: break # Migrate app info if self.is_appinfo_migrated(): self.logger.info('AppInfo already migrated. Resetting version for re-migration.') self.reset_appinfo_migration() time.sleep(STATUS_INTERVAL_SECONDS) self.start_appinfo_migration() self.logger.info('AppInfo Migration Started.') self.metrics['appinfo_migration_start'] = get_current_time() is_appinfo_migrated = False while not is_appinfo_migrated: is_appinfo_migrated = self.is_appinfo_migrated() time.sleep(STATUS_INTERVAL_SECONDS) if is_appinfo_migrated: self.metrics['appinfo_migration_end'] = get_current_time() break self.logger.info('AppInfo Migration Ended.') # De-dup management app job = self.start_dedup(MANAGEMENT_APP_ID) self.logger.info('Started management dedup. App=[%s], Job=[%s]', MANAGEMENT_APP_ID, job) is_running = True while is_running: time.sleep(STATUS_INTERVAL_SECONDS) is_running = self.is_dedup_running(job) if not is_running: break self.logger.info("Finished dedup. App=[%s], Job=[%s]", MANAGEMENT_APP_ID, job) self.metrics['dedup_end_' + MANAGEMENT_APP_ID] = get_current_time() # Reindex management app job = self.start_app_reindex(MANAGEMENT_APP_ID) self.metrics['reindex_start'] = get_current_time() self.logger.info('Started management Re-index. Job=[%s]', job) is_running = True while is_running: time.sleep(STATUS_INTERVAL_SECONDS) is_running = self.is_reindex_running(job) if not is_running: break self.logger.info("Finished management Re-index. Job=[%s]", job) self.metrics['reindex_end'] = get_current_time() # Dedup and re-index all of organization's apps app_ids = self.get_app_ids() for app_id in app_ids: # De-dup app job = self.start_dedup(app_id) self.logger.info('Started dedup. App=[%s], Job=[%s]', app_id, job) is_running = True while is_running: time.sleep(STATUS_INTERVAL_SECONDS) is_running = self.is_dedup_running(job) if not is_running: break self.logger.info("Finished dedup. App=[%s], Job=[%s]", app_id, job) self.metrics['dedup_end_' + app_id] = get_current_time() # Re-index app job = self.start_app_reindex(app_id) self.metrics['reindex_start_' + app_id] = get_current_time() self.logger.info('Started Re-index. App=[%s], Job=[%s]', app_id, job) is_running = True while is_running: time.sleep(STATUS_INTERVAL_SECONDS) is_running = self.is_reindex_running(job) if not is_running: break self.logger.info("Finished Re-index. App=[%s], Job=[%s]", app_id, job) self.metrics['reindex_end_' + app_id] = get_current_time() self.log_metrics() self.logger.info("Finished...") except KeyboardInterrupt: self.log_metrics() self.logger.error('Keyboard interrupted migration. Please run again to ensure the migration finished.') def get_database_setup_url(self): url = self.endpoint + '/system/database/setup' return url def get_migration_url(self): url = self.endpoint + '/system/migrate/run' return url def get_reset_migration_url(self): url = self.endpoint + '/system/migrate/set' return url def get_migration_status_url(self): url = self.endpoint + '/system/migrate/status' return url def get_dedup_url(self): url = self.endpoint + '/system/connection/dedup' return url def get_reindex_url(self): url = self.endpoint + '/system/index/rebuild' return url def get_management_reindex_url(self): url = self.get_reindex_url() + "/management" return url def start_core_data_migration(self): try: r = requests.put(url=self.get_migration_url(), auth=(self.super_user, self.super_pass)) response = r.json() return response except requests.exceptions.RequestException as e: self.logger.error('Failed to start migration, %s', e) exit_on_error(str(e)) def start_fulldata_migration(self): try: r = requests.put(url=self.get_migration_url(), auth=(self.super_user, self.super_pass)) response = r.json() return response except requests.exceptions.RequestException as e: self.logger.error('Failed to start migration, %s', e) exit_on_error(str(e)) def start_migration_system_update(self): try: # TODO fix this URL migrateUrl = self.get_migration_url() + '/' + PLUGIN_MIGRATION_SYSTEM r = requests.put(url=migrateUrl, auth=(self.super_user, self.super_pass)) response = r.json() return response except requests.exceptions.RequestException as e: self.logger.error('Failed to start migration, %s', e) exit_on_error(str(e)) def run_database_setup(self): try: setupUrl = self.get_database_setup_url() r = requests.put(url=setupUrl, auth=(self.super_user, self.super_pass)) if r.status_code != 200: exit_on_error('Database Setup Failed') except requests.exceptions.RequestException as e: self.logger.error('Failed to run database setup, %s', e) exit_on_error(str(e)) def start_index_mapping_migration(self): try: migrateUrl = self.get_migration_url() + '/' + PLUGIN_INDEX_MAPPING r = requests.put(url=migrateUrl, auth=(self.super_user, self.super_pass)) response = r.json() return response except requests.exceptions.RequestException as e: self.logger.error('Failed to start migration, %s', e) exit_on_error(str(e)) def start_appinfo_migration(self): try: migrateUrl = self.get_migration_url() + '/' + PLUGIN_APPINFO r = requests.put(url=migrateUrl, auth=(self.super_user, self.super_pass)) response = r.json() return response except requests.exceptions.RequestException as e: self.logger.error('Failed to start migration, %s', e) exit_on_error(str(e)) def reset_data_migration(self): version = TARGET_ENTITY_DATA_VERSION - 1 body = json.dumps({PLUGIN_ENTITYDATA: version, PLUGIN_APPINFO: version}) try: r = requests.put(url=self.get_reset_migration_url(), data=body, auth=(self.super_user, self.super_pass)) response = r.json() self.logger.info('Resetting data migration versions to %s=[%s] ' 'and %s=[%s]', PLUGIN_ENTITYDATA, version, PLUGIN_APPINFO, version) return response except requests.exceptions.RequestException as e: self.logger.error('Failed to reset full data migration versions, %s', e) exit_on_error(str(e)) def reset_appinfo_migration(self): version = TARGET_APPINFO_VERSION - 1 body = json.dumps({PLUGIN_APPINFO: version}) try: r = requests.put(url=self.get_reset_migration_url(), data=body, auth=(self.super_user, self.super_pass)) response = r.json() self.logger.info('Resetting appinfo migration versions to %s=[%s]', PLUGIN_APPINFO, version) return response except requests.exceptions.RequestException as e: self.logger.error('Failed to reset appinfo migration version, %s', e) exit_on_error(str(e)) def is_data_migrated(self): status = self.check_data_migration_status() if status is not None: entity_version = status['data'][PLUGIN_ENTITYDATA] appinfo_version = status['data'][PLUGIN_APPINFO] core_data_version = status['data'][PLUGIN_CORE_DATA] if entity_version == TARGET_ENTITY_DATA_VERSION and appinfo_version == TARGET_APPINFO_VERSION and core_data_version == TARGET_CORE_DATA_VERSION: self.logger.info('Full Data Migration status=[COMPLETE], %s=[%s], ' '%s=[%s], %s=%s', PLUGIN_ENTITYDATA, entity_version, PLUGIN_APPINFO, appinfo_version, PLUGIN_CORE_DATA, core_data_version) return True else: self.logger.info('Full Data Migration status=[NOTSTARTED/INPROGRESS]') return False def is_appinfo_migrated(self): status = self.check_data_migration_status() if status is not None: appinfo_version = status['data'][PLUGIN_APPINFO] if appinfo_version == TARGET_APPINFO_VERSION: self.logger.info('AppInfo Migration status=[COMPLETE],' '%s=[%s]', PLUGIN_APPINFO, appinfo_version) return True else: self.logger.info('AppInfo Migration status=[NOTSTARTED/INPROGRESS]') return False def is_migration_system_updated(self): status = self.check_data_migration_status() if status is not None: migration_system_version = status['data'][PLUGIN_MIGRATION_SYSTEM] if migration_system_version == TARGET_MIGRATION_SYSTEM_VERSION: self.logger.info('Migration System CURRENT, %s=[%s]', PLUGIN_MIGRATION_SYSTEM, migration_system_version) return True else: self.logger.info('Migration System OLD, %s=[%s]', PLUGIN_MIGRATION_SYSTEM, migration_system_version) return False def is_index_mapping_updated(self): status = self.check_data_migration_status() if status is not None: index_mapping_version = status['data'][PLUGIN_INDEX_MAPPING] if index_mapping_version == TARGET_INDEX_MAPPING_VERSION: self.logger.info('Index Mapping CURRENT, %s=[%s]', PLUGIN_INDEX_MAPPING, index_mapping_version) return True else: self.logger.info('Index Mapping OLD, %s=[%s]', PLUGIN_INDEX_MAPPING, index_mapping_version) return False def check_data_migration_status(self): try: r = requests.get(url=self.get_migration_status_url(), auth=(self.super_user, self.super_pass)) if r.status_code == 200: response = r.json() return response else: self.logger.error('Failed to check migration status, %s', r) return except requests.exceptions.RequestException as e: self.logger.error('Failed to check migration status, %s', e) # exit_on_error() def get_reindex_status(self, job): status_url = self.get_reindex_url()+'/' + job try: r = requests.get(url=status_url, auth=(self.super_user, self.super_pass)) response = r.json() return response['status'] except requests.exceptions.RequestException as e: self.logger.error('Failed to get reindex status, %s', e) # exit_on_error() def start_app_reindex(self, appId): body = "" if self.start_date is not None: body = json.dumps({'updated': self.start_date}) try: r = requests.post(url=self.get_reindex_url() + "/" + appId, data=body, auth=(self.super_user, self.super_pass)) if r.status_code == 200: response = r.json() return response['jobId'] else: self.logger.error('Failed to start reindex, %s', r) exit_on_error(str(r)) except requests.exceptions.RequestException as e: self.logger.error('Unable to make API request for reindex, %s', e) exit_on_error(str(e)) def is_reindex_running(self, job): status = self.get_reindex_status(job) self.logger.info('Re-index status=[%s]', status) if status != "COMPLETE": return True else: return False def get_dedup_status(self, job): status_url = self.get_dedup_url()+'/' + job try: r = requests.get(url=status_url, auth=(self.super_user, self.super_pass)) response = r.json() return response['status']['status'] except requests.exceptions.RequestException as e: self.logger.error('Failed to get dedup status, %s', e) # exit_on_error() def start_dedup(self, app_id): body = "" try: r = requests.post(url=self.get_dedup_url() + "/" + app_id, data=body, auth=(self.super_user, self.super_pass)) if r.status_code == 200: response = r.json() return response['status']['jobStatusId'] else: self.logger.error('Failed to start dedup, %s', r) exit_on_error(str(r)) except requests.exceptions.RequestException as e: self.logger.error('Unable to make API request for dedup, %s', e) exit_on_error(str(e)) def is_dedup_running(self, job): status = self.get_dedup_status(job) self.logger.info('Dedup status=[%s]', status) if status != "COMPLETE": return True else: return False def is_endpoint_available(self): try: r = requests.get(url=self.endpoint+'/status') if r.status_code == 200: return True except requests.exceptions.RequestException as e: self.logger.error('Endpoint is unavailable, %s', str(e)) return False def log_metrics(self): self.logger.info( 'Re-index start=[%s], ' + 'Re-index end =[%s], ' + 'Full Data Migration start=[%s], ' + 'Full Data Migration end=[%s] ' + 'AppInfo Migration start=[%s], ' + 'AppInfo Migration end=[%s] ', self.metrics['reindex_start'], self.metrics['reindex_end'], self.metrics['full_data_migration_start'], self.metrics['full_data_migration_end'], self.metrics['appinfo_migration_start'], self.metrics['appinfo_migration_end'] ) def get_app_ids(self): try: url = self.endpoint + "/management/organizations" r = requests.get(url=url, auth=(self.super_user, self.super_pass)) if r.status_code != 200: exit_on_error('Cannot get app ids: ' + r.text) response_json = r.json() app_ids = [] orgs = response_json["organizations"] if orgs is not None: for org in orgs: if org["name"] == self.org: for app in org["applications"]: app_ids.append(org["applications"][app]) else: e = 'No Orgs in this system' self.logger.error(e) exit_on_error(e) return app_ids except requests.exceptions.RequestException as e: self.logger.error('Unable to get list of application ids, %s', e) exit_on_error(str(e)) def get_current_time(): return str(int(time.time()*1000)) def exit_on_error(e=""): print ('Exiting migration script due to error: ' + str(e)) sys.exit(1) def init_logging(name): logger = logging.getLogger(name) log_file_name = './migration.log' log_formatter = logging.Formatter(fmt='%(asctime)s [%(name)s] %(levelname)s %(message)s', datefmt='%Y-%m-%d %H:%M:%S') rotating_file = logging.handlers.RotatingFileHandler(filename=log_file_name, mode='a', maxBytes=104857600, backupCount=10) rotating_file.setFormatter(log_formatter) rotating_file.setLevel(logging.INFO) logger.addHandler(rotating_file) logger.setLevel(logging.INFO) stdout_logger = logging.StreamHandler(sys.stdout) stdout_logger.setFormatter(log_formatter) stdout_logger.setLevel(logging.INFO) logger.addHandler(stdout_logger) return logger if __name__ == '__main__': migration = Migrate() migration.run()