def _migration_worker()

in aliyun/log/es_migration/migration_manager.py [0:0]


def _migration_worker(config, task, shutdown_flag, es_version):
    if op.exists(shutdown_flag):
        # Already interrupted
        return Checkpoint.interrupted

    extra = {
        'task_id': task['id'],
        'es_index': task['es_index'],
        'es_shard': task['es_shard'],
        'logstore': task['logstore'],
    }
    print('migrate:' + json.dumps(extra))
    logger = PrefixLoggerAdapter('', extra, _logger, {})
    logger.info('Migration worker starts')
    try:
        logstore = MigrationLogstore(
            endpoint=config.get('endpoint'),
            access_id=config.access_key_id,
            access_key=config.access_key,
            project_name=config.get('project_name'),
            logstore_name=task['logstore'],
            topic=config.get('topic'),
            source=config.get('source'),
        )
        task = MigrationTask(
            _id=task['id'],
            es_client=Elasticsearch(
                hosts=config.hosts,
                timeout=60,
                max_retries=30,
                retry_on_timeout=True,
                verify_certs=False,
            ),
            es_index=task['es_index'],
            es_shard=task['es_shard'],
            logstore=logstore,
            ckpt_path=config.ckpt_path,
            time_reference=config.get('time_reference'),
            batch_size=config.get('batch_size'),
            logger=logger,
            es_version=es_version,
            es_query=config.get('query'),
            es_scroll=config.get('scroll'),
        )
        return task.run(shutdown_flag)
    except BaseException:
        logger.error(
            'Exception in migration worker',
            extra=traceback.format_exc(),
        )
        raise
    finally:
        logger.info('Migration worker exits')