in aliyun/log/es_migration/migration_manager.py [0:0]
def migrate(self):
self._logger.info('Migration starts')
tasks = self._discover_tasks()
task_cnt = len(tasks)
pool_size = min(self._config.get('pool_size'), task_cnt)
print('#pool_size: {}'.format(pool_size))
print('#tasks: {}'.format(task_cnt))
self._prepare()
with ProcessPoolExecutor(max_workers=pool_size) as pool:
retries_failed = self._config.get('retries_failed')
for i in range(retries_failed):
msg = 'Start migration tasks'
print('\n>>>> ', msg)
_logger.info(msg, extra={'retries': i})
futures = []
state = {
'total': task_cnt,
Checkpoint.finished: 0,
Checkpoint.dropped: 0,
Checkpoint.failed: 0,
}
for task in tasks:
futures.append(
pool.submit(
_migration_worker,
self._config,
task,
self._shutdown_flag,
self._es_version,
)
)
try:
for future in as_completed(futures):
res = future.result()
if res in state:
state[res] += 1
self._logger.info('State', extra=state)
print('>> state:', json.dumps(state))
except BaseException:
self._logger.error(
'Exception',
extra={'traceback': traceback.format_exc()},
)
for future in futures:
if not future.done():
future.cancel()
list(as_completed(futures, timeout=10))
if op.exists(self._shutdown_flag):
# Already interrupted
break
if state[Checkpoint.finished] + state[Checkpoint.dropped] >= task_cnt:
print('All migration tasks finished')
break
if i < retries_failed - 1:
msg = 'Waiting for retrying failed tasks...'
print(msg)
_logger.info(msg)
time.sleep(60 * 2)
self._logger.info('Migration exits')
print('\nexit:', json.dumps(state))
return state