def create_es_publisher_sample_job()

in amazon-neptune-and-aws-cdk-for-amundsen/lib/databuilders/postgres/postgres_extract_neptune_publish.py [0:0]


def create_es_publisher_sample_job(elasticsearch_index_alias='table_search_index',
                                   elasticsearch_doc_type_key='table',
                                   model_name='databuilder.models.table_elasticsearch_document.TableESDocument',
                                   entity_type='table',
                                   elasticsearch_mapping=None):
    """
    :param elasticsearch_index_alias:  alias for Elasticsearch used in
                                       amundsensearchlibrary/search_service/config.py as an index
    :param elasticsearch_doc_type_key: name the ElasticSearch index is prepended with. Defaults to `table` resulting in
                                       `table_{uuid}`
    :param model_name:                 the Databuilder model class used in transporting between Extractor and Loader
    :param entity_type:                Entity type handed to the `Neo4jSearchDataExtractor` class, used to determine
                                       Cypher query to extract data from Neo4j. Defaults to `table`.
    :param elasticsearch_mapping:      Elasticsearch field mapping "DDL" handed to the `ElasticsearchPublisher` class,
                                       if None is given (default) it uses the `Table` query baked into the Publisher
    """
    # loader saves data to this location and publisher reads it from here
    extracted_search_data_path = '/var/tmp/amundsen/search_data.json'
    loader = FSElasticsearchJSONLoader()
    extractor = NeptuneSearchDataExtractor()

    task = DefaultTask(
        loader=loader,
        extractor=extractor,
        transformer=NoopTransformer()
    )

    # elastic search client instance
    elasticsearch_client = es
    # unique name of new index in Elasticsearch
    elasticsearch_new_index_key = '{}_'.format(elasticsearch_doc_type_key) + str(uuid.uuid4())
    publisher = ElasticsearchPublisher()
    session = boto3.Session()
    aws_creds = session.get_credentials()
    aws_access_key = aws_creds.access_key
    aws_access_secret = aws_creds.secret_key
    aws_token = aws_creds.token

    job_config = ConfigFactory.from_dict({
        extractor.get_scope(): {
            NeptuneSearchDataExtractor.ENTITY_TYPE_CONFIG_KEY: entity_type,
            NeptuneSearchDataExtractor.MODEL_CLASS_CONFIG_KEY: model_name,
            'neptune.client': {
                NeptuneSessionClient.NEPTUNE_HOST_NAME: NEPTUNE_ENDPOINT,
                NeptuneSessionClient.AWS_REGION: AWS_REGION,
                NeptuneSessionClient.AWS_ACCESS_KEY: aws_access_key,
                NeptuneSessionClient.AWS_SECRET_ACCESS_KEY: aws_access_secret,
                NeptuneSessionClient.AWS_SESSION_TOKEN: aws_token
            }
        },
        'loader.filesystem.elasticsearch.file_path': extracted_search_data_path,
        'loader.filesystem.elasticsearch.mode': 'w',
        publisher.get_scope(): {
            'file_path': extracted_search_data_path,
            'mode': 'r',
            'client': elasticsearch_client,
            'new_index': elasticsearch_new_index_key,
            'doc_type': elasticsearch_doc_type_key,
            'alias': elasticsearch_index_alias
        }
    })

    # only optionally add these keys, so need to dynamically `put` them
    if elasticsearch_mapping:
        job_config.put('publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_MAPPING_CONFIG_KEY),
                       elasticsearch_mapping)

    job = DefaultJob(
        conf=job_config,
        task=task,
        publisher=ElasticsearchPublisher()
    )
    return job