in aliyun/log/es_migration/migration_manager.py [0:0]
def _migration_worker(config, task, shutdown_flag, es_version):
if op.exists(shutdown_flag):
# Already interrupted
return Checkpoint.interrupted
extra = {
'task_id': task['id'],
'es_index': task['es_index'],
'es_shard': task['es_shard'],
'logstore': task['logstore'],
}
print('migrate:' + json.dumps(extra))
logger = PrefixLoggerAdapter('', extra, _logger, {})
logger.info('Migration worker starts')
try:
logstore = MigrationLogstore(
endpoint=config.get('endpoint'),
access_id=config.access_key_id,
access_key=config.access_key,
project_name=config.get('project_name'),
logstore_name=task['logstore'],
topic=config.get('topic'),
source=config.get('source'),
)
task = MigrationTask(
_id=task['id'],
es_client=Elasticsearch(
hosts=config.hosts,
timeout=60,
max_retries=30,
retry_on_timeout=True,
verify_certs=False,
),
es_index=task['es_index'],
es_shard=task['es_shard'],
logstore=logstore,
ckpt_path=config.ckpt_path,
time_reference=config.get('time_reference'),
batch_size=config.get('batch_size'),
logger=logger,
es_version=es_version,
es_query=config.get('query'),
es_scroll=config.get('scroll'),
)
return task.run(shutdown_flag)
except BaseException:
logger.error(
'Exception in migration worker',
extra=traceback.format_exc(),
)
raise
finally:
logger.info('Migration worker exits')