def migrate()

in aliyun/log/es_migration/migration_manager.py [0:0]


    def migrate(self):
        self._logger.info('Migration starts')
        tasks = self._discover_tasks()
        task_cnt = len(tasks)
        pool_size = min(self._config.get('pool_size'), task_cnt)
        print('#pool_size: {}'.format(pool_size))
        print('#tasks: {}'.format(task_cnt))

        self._prepare()
        with ProcessPoolExecutor(max_workers=pool_size) as pool:
            retries_failed = self._config.get('retries_failed')
            for i in range(retries_failed):
                msg = 'Start migration tasks'
                print('\n>>>> ', msg)
                _logger.info(msg, extra={'retries': i})

                futures = []
                state = {
                    'total': task_cnt,
                    Checkpoint.finished: 0,
                    Checkpoint.dropped: 0,
                    Checkpoint.failed: 0,
                }
                for task in tasks:
                    futures.append(
                        pool.submit(
                            _migration_worker,
                            self._config,
                            task,
                            self._shutdown_flag,
                            self._es_version,
                        )
                    )
                try:
                    for future in as_completed(futures):
                        res = future.result()
                        if res in state:
                            state[res] += 1
                        self._logger.info('State', extra=state)
                        print('>> state:', json.dumps(state))
                except BaseException:
                    self._logger.error(
                        'Exception',
                        extra={'traceback': traceback.format_exc()},
                    )
                    for future in futures:
                        if not future.done():
                            future.cancel()
                    list(as_completed(futures, timeout=10))

                if op.exists(self._shutdown_flag):
                    # Already interrupted
                    break

                if state[Checkpoint.finished] + state[Checkpoint.dropped] >= task_cnt:
                    print('All migration tasks finished')
                    break

                if i < retries_failed - 1:
                    msg = 'Waiting for retrying failed tasks...'
                    print(msg)
                    _logger.info(msg)
                    time.sleep(60 * 2)

        self._logger.info('Migration exits')
        print('\nexit:', json.dumps(state))
        return state