utils/usergrid-util-python/usergrid_tools/migration/usergrid_data_exporter.py (653 lines of code) (raw):

# */ # * Licensed to the Apache Software Foundation (ASF) under one # * or more contributor license agreements. See the NOTICE file # * distributed with this work for additional information # * regarding copyright ownership. The ASF licenses this file # * to you under the Apache License, Version 2.0 (the # * "License"); you may not use this file except in compliance # * with the License. You may obtain a copy of the License at # * # * http://www.apache.org/licenses/LICENSE-2.0 # * # * Unless required by applicable law or agreed to in writing, # * software distributed under the License is distributed on an # * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # * KIND, either express or implied. See the License for the # * specific language governing permissions and limitations # * under the License. # */ from __future__ import print_function import os import uuid from Queue import Empty import argparse import json import logging import sys from multiprocessing import Queue, Process import time_uuid import datetime from cloghandler import ConcurrentRotatingFileHandler import requests import traceback import time from sys import platform as _platform import signal from usergrid import UsergridQueryIterator import urllib3 __author__ = 'Jeff.West@yahoo.com' ECID = str(uuid.uuid1()) key_version = 'v4' logger = logging.getLogger('GraphMigrator') worker_logger = logging.getLogger('Worker') collection_worker_logger = logging.getLogger('CollectionWorker') error_logger = logging.getLogger('ErrorLogger') audit_logger = logging.getLogger('AuditLogger') status_logger = logging.getLogger('StatusLogger') urllib3.disable_warnings() DEFAULT_CREATE_APPS = False DEFAULT_RETRY_SLEEP = 10 DEFAULT_PROCESSING_SLEEP = 1 queue = Queue() QSIZE_OK = False try: queue.qsize() QSIZE_OK = True except: pass session_source = requests.Session() session_target = requests.Session() def total_seconds(td): return (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10 ** 6) / 10 ** 6 def init_logging(stdout_enabled=True): root_logger = logging.getLogger() root_logger.setLevel(logging.getLevelName(config.get('log_level', 'INFO'))) # root_logger.setLevel(logging.WARN) logging.getLogger('requests.packages.urllib3.connectionpool').setLevel(logging.ERROR) logging.getLogger('boto').setLevel(logging.ERROR) logging.getLogger('urllib3.connectionpool').setLevel(logging.WARN) log_formatter = logging.Formatter( fmt='%(asctime)s | ' + ECID + ' | %(name)s | %(processName)s | %(levelname)s | %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p') stdout_logger = logging.StreamHandler(sys.stdout) stdout_logger.setFormatter(log_formatter) root_logger.addHandler(stdout_logger) if stdout_enabled: stdout_logger.setLevel(logging.getLevelName(config.get('log_level', 'INFO'))) # base log file log_file_name = '%s/migrator.log' % config.get('log_dir') # ConcurrentRotatingFileHandler rotating_file = ConcurrentRotatingFileHandler(filename=log_file_name, mode='a', maxBytes=404857600, backupCount=0) rotating_file.setFormatter(log_formatter) rotating_file.setLevel(logging.INFO) root_logger.addHandler(rotating_file) error_log_file_name = '%s/migrator_errors.log' % config.get('log_dir') error_rotating_file = ConcurrentRotatingFileHandler(filename=error_log_file_name, mode='a', maxBytes=404857600, backupCount=0) error_rotating_file.setFormatter(log_formatter) error_rotating_file.setLevel(logging.ERROR) root_logger.addHandler(error_rotating_file) entity_name_map = { 'users': 'username' } config = {} # URL Templates for Usergrid org_management_app_url_template = "{api_url}/management/organizations/{org}/applications?client_id={client_id}&client_secret={client_secret}" org_management_url_template = "{api_url}/management/organizations/{org}/applications?client_id={client_id}&client_secret={client_secret}" org_url_template = "{api_url}/{org}?client_id={client_id}&client_secret={client_secret}" app_url_template = "{api_url}/{org}/{app}?client_id={client_id}&client_secret={client_secret}" collection_url_template = "{api_url}/{org}/{app}/{collection}?client_id={client_id}&client_secret={client_secret}" collection_query_url_template = "{api_url}/{org}/{app}/{collection}?ql={ql}&client_id={client_id}&client_secret={client_secret}&limit={limit}" collection_graph_url_template = "{api_url}/{org}/{app}/{collection}?client_id={client_id}&client_secret={client_secret}&limit={limit}" connection_query_url_template = "{api_url}/{org}/{app}/{collection}/{uuid}/{verb}?client_id={client_id}&client_secret={client_secret}" connecting_query_url_template = "{api_url}/{org}/{app}/{collection}/{uuid}/connecting/{verb}?client_id={client_id}&client_secret={client_secret}" connection_create_by_uuid_url_template = "{api_url}/{org}/{app}/{collection}/{uuid}/{verb}/{target_uuid}?client_id={client_id}&client_secret={client_secret}" connection_create_by_name_url_template = "{api_url}/{org}/{app}/{collection}/{uuid}/{verb}/{target_type}/{target_name}?client_id={client_id}&client_secret={client_secret}" get_entity_url_template = "{api_url}/{org}/{app}/{collection}/{uuid}?client_id={client_id}&client_secret={client_secret}&connections=none" get_entity_url_with_connections_template = "{api_url}/{org}/{app}/{collection}/{uuid}?client_id={client_id}&client_secret={client_secret}" put_entity_url_template = "{api_url}/{org}/{app}/{collection}/{uuid}?client_id={client_id}&client_secret={client_secret}" user_credentials_url_template = "{api_url}/{org}/{app}/users/{uuid}/credentials" ignore_collections = ['activities', 'queues', 'events', 'notifications'] class StatusListener(Process): def __init__(self, status_queue, worker_queue): super(StatusListener, self).__init__() self.status_queue = status_queue self.worker_queue = worker_queue def run(self): keep_going = True org_results = { 'name': config.get('org'), 'apps': {}, } empty_count = 0 while keep_going: try: app, collection, status_map = self.status_queue.get(timeout=60) status_logger.info('Received status update for app/collection: [%s / %s]' % (app, collection)) empty_count = 0 org_results['summary'] = { 'max_created': -1, 'max_modified': -1, 'min_created': 1584946416000, 'min_modified': 1584946416000, 'count': 0, 'bytes': 0 } if app not in org_results['apps']: org_results['apps'][app] = { 'collections': {} } org_results['apps'][app]['collections'].update(status_map) try: for app, app_data in org_results['apps'].items(): app_data['summary'] = { 'max_created': -1, 'max_modified': -1, 'min_created': 1584946416000, 'min_modified': 1584946416000, 'count': 0, 'bytes': 0 } if 'collections' in app_data: for collection, collection_data in app_data['collections'].items(): app_data['summary']['count'] += collection_data['count'] app_data['summary']['bytes'] += collection_data['bytes'] org_results['summary']['count'] += collection_data['count'] org_results['summary']['bytes'] += collection_data['bytes'] # APP if collection_data.get('max_modified') > app_data['summary']['max_modified']: app_data['summary']['max_modified'] = collection_data.get('max_modified') if collection_data.get('min_modified') < app_data['summary']['min_modified']: app_data['summary']['min_modified'] = collection_data.get('min_modified') if collection_data.get('max_created') > app_data['summary']['max_created']: app_data['summary']['max_created'] = collection_data.get('max_created') if collection_data.get('min_created') < app_data['summary']['min_created']: app_data['summary']['min_created'] = collection_data.get('min_created') # ORG if collection_data.get('max_modified') > org_results['summary']['max_modified']: org_results['summary']['max_modified'] = collection_data.get('max_modified') if collection_data.get('min_modified') < org_results['summary']['min_modified']: org_results['summary']['min_modified'] = collection_data.get('min_modified') if collection_data.get('max_created') > org_results['summary']['max_created']: org_results['summary']['max_created'] = collection_data.get('max_created') if collection_data.get('min_created') < org_results['summary']['min_created']: org_results['summary']['min_created'] = collection_data.get('min_created') if QSIZE_OK: status_logger.warn('CURRENT Queue Depth: %s' % self.worker_queue.qsize()) status_logger.warn('UPDATED status of org processed: %s' % json.dumps(org_results)) except KeyboardInterrupt as e: raise e except: print(traceback.format_exc()) except KeyboardInterrupt as e: status_logger.warn('FINAL status of org processed: %s' % json.dumps(org_results)) raise e except Empty: if QSIZE_OK: status_logger.warn('CURRENT Queue Depth: %s' % self.worker_queue.qsize()) status_logger.warn('CURRENT status of org processed: %s' % json.dumps(org_results)) status_logger.warning('EMPTY! Count=%s' % empty_count) empty_count += 1 if empty_count >= 120: keep_going = False except: print(traceback.format_exc()) logger.warn('FINAL status of org processed: %s' % json.dumps(org_results)) class EntityExportWorker(Process): def __init__(self, work_queue, response_queue): super(EntityExportWorker, self).__init__() collection_worker_logger.debug('Creating worker!') self.work_queue = work_queue self.response_queue = response_queue def run(self): collection_worker_logger.info('starting run()...') keep_going = True empty_count = 0 app = 'NOT SET' collection_name = 'NOT SET' status_map = {} entity_file = None try: while keep_going: try: app, collection_name = self.work_queue.get(timeout=30) empty_count = 0 status_map = self.process_collection(app, collection_name) status_map[collection_name]['iteration_finished'] = str(datetime.datetime.now()) collection_worker_logger.warning( 'Collection [%s / %s / %s] loop complete! Max Created entity %s' % ( config.get('org'), app, collection_name, status_map[collection_name]['max_created'])) collection_worker_logger.warning( 'Sending FINAL stats for app/collection [%s / %s]: %s' % (app, collection_name, status_map)) self.response_queue.put((app, collection_name, status_map)) collection_worker_logger.info('Done! Finished app/collection: %s / %s' % (app, collection_name)) except KeyboardInterrupt as e: raise e except Empty: collection_worker_logger.warning('EMPTY! Count=%s' % empty_count) empty_count += 1 if empty_count >= 2: keep_going = False except Exception as e: logger.exception('Error in CollectionWorker processing collection [%s]' % collection_name) print(traceback.format_exc()) finally: if entity_file is not None: entity_file.close() self.response_queue.put((app, collection_name, status_map)) collection_worker_logger.info('FINISHED!') def process_collection(self, app, collection_name): status_map = { collection_name: { 'iteration_started': str(datetime.datetime.now()), 'max_created': -1, 'max_modified': -1, 'min_created': 1584946416000, 'min_modified': 1584946416000, 'count': 0, 'bytes': 0 } } # added a flag for using graph vs query/index if config.get('graph', False): source_collection_url = collection_graph_url_template.format(org=config.get('org'), app=app, collection=collection_name, limit=config.get('limit'), **config.get('source_endpoint')) else: source_collection_url = collection_query_url_template.format(org=config.get('org'), app=app, collection=collection_name, limit=config.get('limit'), ql="select * %s" % config.get( 'ql'), **config.get('source_endpoint')) counter = 0 # use the UsergridQuery from the Python SDK to iterate the collection q = UsergridQueryIterator(source_collection_url, page_delay=config.get('page_sleep_time'), sleep_time=config.get('error_retry_sleep')) directory = os.path.join(config['export_path'], ECID, config['org'], app) if not os.path.exists(directory): os.makedirs(directory) entity_filename = '_'.join([collection_name, 'entity-data']) entity_filename_base = os.path.join(directory, entity_filename) entity_file_number = 0 entity_file_counter = 0 entity_filename = '%s-%s.txt' % (entity_filename_base, entity_file_number) entity_file = open(entity_filename, 'w') edge_filename = '_'.join([collection_name, 'edge-data']) edge_filename_base = os.path.join(directory, edge_filename) edge_file_number = 0 edge_file_counter = 0 edge_filename = '%s-%s.txt' % (edge_filename_base, edge_file_number) edge_file = open(edge_filename, 'w') try: for entity in q: try: entity_file_counter += 1 counter += 1 if entity_file_counter > config['entities_per_file']: entity_file.close() entity_file_number += 1 entity_file_counter = 0 entity_filename = '%s-%s.txt' % (entity_filename_base, entity_file_number) entity_file = open(entity_filename, 'w') entity_file.write('%s\n' % json.dumps(entity)) edge_names = get_edge_names(entity) for edge_name in edge_names: if not include_edge(collection_name, edge_name): continue connection_query_url = connection_query_url_template.format( org=config.get('org'), app=app, verb=edge_name, collection=collection_name, uuid=entity.get('uuid'), limit=config.get('limit'), **config.get('source_endpoint')) connection_query = UsergridQueryIterator(connection_query_url, sleep_time=config.get('error_retry_sleep')) target_uuids = [] try: for target_entity in connection_query: target_uuids.append(target_entity.get('uuid')) except: logger.exception('Error processing edge [%s] of entity [ %s / %s / %s]' % ( edge_name, app, collection_name, entity.get('uuid'))) if len(target_uuids) > 0: edge_file_counter += 1 edges = { 'entity': { 'type': entity.get('type'), 'uuid': entity.get('uuid') }, 'edge_name': edge_name, 'target_uuids': target_uuids } if entity_file_counter > config['entities_per_file']: edge_file.close() edge_file_number += 1 edge_file_counter = 0 edge_filename = '%s-%s.txt' % (edge_filename_base, edge_file_number) edge_file = open(edge_filename, 'w') edge_file.write('%s\n' % json.dumps(edges)) if 'created' in entity: try: entity_created = long(entity.get('created')) if entity_created > status_map[collection_name]['max_created']: status_map[collection_name]['max_created'] = entity_created status_map[collection_name]['max_created_str'] = str( datetime.datetime.fromtimestamp(entity_created / 1000)) if entity_created < status_map[collection_name]['min_created']: status_map[collection_name]['min_created'] = entity_created status_map[collection_name]['min_created_str'] = str( datetime.datetime.fromtimestamp(entity_created / 1000)) except ValueError: pass if 'modified' in entity: try: entity_modified = long(entity.get('modified')) if entity_modified > status_map[collection_name]['max_modified']: status_map[collection_name]['max_modified'] = entity_modified status_map[collection_name]['max_modified_str'] = str( datetime.datetime.fromtimestamp(entity_modified / 1000)) if entity_modified < status_map[collection_name]['min_modified']: status_map[collection_name]['min_modified'] = entity_modified status_map[collection_name]['min_modified_str'] = str( datetime.datetime.fromtimestamp(entity_modified / 1000)) except ValueError: pass status_map[collection_name]['bytes'] += count_bytes(entity) status_map[collection_name]['count'] += 1 if counter % 1000 == 1: try: collection_worker_logger.warning( 'Sending incremental stats for app/collection [%s / %s]: %s' % ( app, collection_name, status_map)) self.response_queue.put((app, collection_name, status_map)) if QSIZE_OK: collection_worker_logger.info( 'Counter=%s, collection queue depth=%s' % ( counter, self.work_queue.qsize())) except: pass collection_worker_logger.warn( 'Current status of collections processed: %s' % json.dumps(status_map)) except KeyboardInterrupt: raise except: logger.exception( 'Error processing entity %s / %s / %s' % (app, collection_name, entity.get('uuid'))) except KeyboardInterrupt: raise except: logger.exception('Error processing collection %s / %s ' % (app, collection_name)) finally: if edge_file is not None: edge_file.close() if entity_file is not None: entity_file.close() return status_map def use_name_for_collection(collection_name): return collection_name in config.get('use_name_for_collection', []) def include_edge(collection_name, edge_name): include_edges = config.get('include_edge', []) if include_edges is None: include_edges = [] exclude_edges = config.get('exclude_edge', []) if exclude_edges is None: exclude_edges = [] if len(include_edges) > 0 and edge_name not in include_edges: logger.debug( 'Skipping edge [%s] since it is not in INCLUDED list: %s' % (edge_name, include_edges)) return False if edge_name in exclude_edges: logger.debug( 'Skipping edge [%s] since it is in EXCLUDED list: %s' % (edge_name, exclude_edges)) return False if (collection_name in ['users', 'user'] and edge_name in ['roles', 'followers', 'groups', 'feed', 'activities']) \ or (collection_name in ['device', 'devices'] and edge_name in ['users']) \ or (collection_name in ['receipts', 'receipt'] and edge_name in ['device', 'devices']): # feed and activities are not retrievable... # roles and groups will be more efficiently handled from the role/group -> user # followers will be handled by 'following' # do only this from user -> device return False return True def get_source_identifier(source_entity): entity_type = source_entity.get('type') source_identifier = source_entity.get('uuid') if use_name_for_collection(entity_type): if entity_type in ['user']: source_identifier = source_entity.get('username') else: source_identifier = source_entity.get('name') if source_identifier is None: source_identifier = source_entity.get('uuid') logger.warn('Using UUID for entity [%s / %s]' % (entity_type, source_identifier)) return source_identifier def include_collection(collection_name): exclude = config.get('exclude_collection', []) if exclude is not None and collection_name in exclude: return False return True def get_edge_names(entity): out_edge_names = [edge_name for edge_name in entity.get('metadata', {}).get('collections', [])] out_edge_names += [edge_name for edge_name in entity.get('metadata', {}).get('connections', [])] return out_edge_names def get_uuid_time(the_uuid_string): return time_uuid.TimeUUID(the_uuid_string).get_datetime() def parse_args(): parser = argparse.ArgumentParser(description='Usergrid Org/App Migrator') parser.add_argument('--log_dir', help='path to the place where logs will be written', default='./', type=str, required=False) parser.add_argument('--log_level', help='log level - DEBUG, INFO, WARN, ERROR, CRITICAL', default='INFO', type=str, required=False) parser.add_argument('-o', '--org', help='Name of the org to migrate', type=str, required=True) parser.add_argument('-a', '--app', help='Name of one or more apps to include, specify none to include all apps', required=False, action='append') parser.add_argument('-e', '--include_edge', help='Name of one or more edges/connection types to INCLUDE, specify none to include all edges', required=False, action='append') parser.add_argument('--exclude_edge', help='Name of one or more edges/connection types to EXCLUDE, specify none to include all edges', required=False, action='append') parser.add_argument('--exclude_collection', help='Name of one or more collections to EXCLUDE, specify none to include all collections', required=False, action='append') parser.add_argument('-c', '--collection', help='Name of one or more collections to include, specify none to include all collections', default=[], action='append') parser.add_argument('-s', '--source_config', help='The path to the source endpoint/org configuration file', type=str, default='source.json') parser.add_argument('--export_path', help='The path to save the export files', type=str, default='.') parser.add_argument('--limit', help='The number of entities to return per query request', type=int, default=100) parser.add_argument('--entities_per_file', help='The number of entities to put in one JSON file', type=int, default=10000) parser.add_argument('--error_retry_sleep', help='The number of seconds to wait between retrieving after an error', type=float, default=30) parser.add_argument('--page_sleep_time', help='The number of seconds to wait between retrieving pages from the UsergridQueryIterator', type=float, default=.5) parser.add_argument('--entity_sleep_time', help='The number of seconds to wait between retrieving pages from the UsergridQueryIterator', type=float, default=.1) parser.add_argument('--workers', dest='collection_workers', help='The number of worker processes to do the migration', type=int, default=4) parser.add_argument('--queue_size_max', help='The max size of entities to allow in the queue', type=int, default=100000) parser.add_argument('--ql', help='The QL to use in the filter for reading data from collections', type=str, default='select * order by created asc') # default='select * order by created asc') parser.add_argument('--nohup', help='specifies not to use stdout for logging', action='store_true') parser.add_argument('--graph', help='Use GRAPH instead of Query', dest='graph', action='store_true') my_args = parser.parse_args(sys.argv[1:]) return vars(my_args) def init(): global config config['collection_mapping'] = {} config['app_mapping'] = {} config['org_mapping'] = {} with open(config.get('source_config'), 'r') as f: config['source_config'] = json.load(f) if config['exclude_collection'] is None: config['exclude_collection'] = [] config['source_endpoint'] = config['source_config'].get('endpoint').copy() config['source_endpoint'].update(config['source_config']['credentials'][config['org']]) def wait_for(threads, label, sleep_time=60): wait = True logger.info('Starting to wait for [%s] threads with sleep time=[%s]' % (len(threads), sleep_time)) while wait: wait = False alive_count = 0 for t in threads: if t.is_alive(): alive_count += 1 logger.info('Thread [%s] is still alive' % t.name) if alive_count > 0: wait = True logger.info('Continuing to wait for [%s] threads with sleep time=[%s]' % (alive_count, sleep_time)) time.sleep(sleep_time) logger.warn('All workers [%s] done!' % label) def count_bytes(entity): entity_copy = entity.copy() if 'metadata' in entity_copy: del entity_copy['metadata'] entity_str = json.dumps(entity_copy) return len(entity_str) def check_response_status(r, url, exit_on_error=True): if r.status_code != 200: logger.critical('HTTP [%s] on URL=[%s]' % (r.status_code, url)) logger.critical('Response: %s' % r.text) if exit_on_error: exit() def main(): global config config = parse_args() init() init_logging() status_map = {} org_apps = { } if len(org_apps) == 0: source_org_mgmt_url = org_management_url_template.format(org=config.get('org'), limit=config.get('limit'), **config.get('source_endpoint')) print('Retrieving apps from [%s]' % source_org_mgmt_url) logger.info('Retrieving apps from [%s]' % source_org_mgmt_url) try: # list the apps for the SOURCE org logger.info('GET %s' % source_org_mgmt_url) r = session_source.get(source_org_mgmt_url) if r.status_code != 200: logger.critical( 'Abort processing: Unable to retrieve apps from [%s]: %s' % (source_org_mgmt_url, r.text)) exit() logger.info(json.dumps(r.text)) org_apps = r.json().get('data') except Exception as e: logger.exception('ERROR Retrieving apps from [%s]' % source_org_mgmt_url) print(traceback.format_exc()) logger.critical('Unable to retrieve apps from [%s] and will exit' % source_org_mgmt_url) exit() if _platform == "linux" or _platform == "linux2": collection_queue = Queue(maxsize=config.get('queue_size_max')) collection_response_queue = Queue(maxsize=config.get('queue_size_max')) else: collection_queue = Queue() collection_response_queue = Queue() logger.info('Starting entity_workers...') status_listener = StatusListener(collection_response_queue, collection_queue) status_listener.start() # start the worker processes which will iterate the collections collection_workers = [EntityExportWorker(collection_queue, collection_response_queue) for x in xrange(config.get('collection_workers'))] [w.start() for w in collection_workers] try: apps_to_process = config.get('app') collections_to_process = config.get('collection') # iterate the apps retrieved from the org for org_app in sorted(org_apps.keys()): logger.info('Found SOURCE App: %s' % org_app) time.sleep(3) for org_app in sorted(org_apps.keys()): parts = org_app.split('/') app = parts[1] # if apps are specified and the current app is not in the list, skip it if apps_to_process and len(apps_to_process) > 0 and app not in apps_to_process: logger.warning('Skipping app [%s] not included in process list [%s]' % (app, apps_to_process)) continue logger.info('Processing app=[%s]' % app) status_map[app] = { 'iteration_started': str(datetime.datetime.now()), 'max_created': -1, 'max_modified': -1, 'min_created': 1584946416000, 'min_modified': 1584946416000, 'count': 0, 'bytes': 0, 'collections': {} } # get the list of collections from the source org/app source_app_url = app_url_template.format(org=config.get('org'), app=app, **config.get('source_endpoint')) logger.info('GET %s' % source_app_url) r_collections = session_source.get(source_app_url) collection_attempts = 0 # sometimes this call was not working so I put it in a loop to force it... while r_collections.status_code != 200 and collection_attempts < 5: collection_attempts += 1 logger.warning('FAILED: GET (%s) [%s] URL: %s' % (r_collections.elapsed, r_collections.status_code, source_app_url)) time.sleep(DEFAULT_RETRY_SLEEP) r_collections = session_source.get(source_app_url) if collection_attempts >= 5: logger.critical('Unable to get collections at URL %s, skipping app' % source_app_url) continue app_response = r_collections.json() logger.info('App Response: ' + json.dumps(app_response)) app_entities = app_response.get('entities', []) if len(app_entities) > 0: app_entity = app_entities[0] collections = app_entity.get('metadata', {}).get('collections', {}) logger.info('Collection List: %s' % collections) # iterate the collections which are returned. for collection_name, collection_data in collections.iteritems(): exclude_collections = config.get('exclude_collection', []) if exclude_collections is None: exclude_collections = [] # filter out collections as configured... if collection_name in ignore_collections \ or (len(collections_to_process) > 0 and collection_name not in collections_to_process) \ or (len(exclude_collections) > 0 and collection_name in exclude_collections) \ or (config.get('migrate') == 'credentials' and collection_name != 'users'): logger.warning('Skipping collection=[%s]' % collection_name) continue logger.info('Publishing app / collection: %s / %s' % (app, collection_name)) collection_queue.put((app, collection_name)) status_map[app]['iteration_finished'] = str(datetime.datetime.now()) logger.info('Finished publishing collections for app [%s] !' % app) # allow collection workers to finish wait_for(collection_workers, label='collection_workers', sleep_time=30) status_listener.terminate() except KeyboardInterrupt: logger.warning('Keyboard Interrupt, aborting...') collection_queue.close() collection_response_queue.close() [os.kill(super(EntityExportWorker, p).pid, signal.SIGINT) for p in collection_workers] os.kill(super(StatusListener, status_listener).pid, signal.SIGINT) [w.terminate() for w in collection_workers] status_listener.terminate() logger.info('entity_workers DONE!') if __name__ == "__main__": main()