in amazon-neptune-and-aws-cdk-for-amundsen/lib/databuilders/postgres/postgres_extract_neptune_publish.py [0:0]
def create_es_publisher_sample_job(elasticsearch_index_alias='table_search_index',
elasticsearch_doc_type_key='table',
model_name='databuilder.models.table_elasticsearch_document.TableESDocument',
entity_type='table',
elasticsearch_mapping=None):
"""
:param elasticsearch_index_alias: alias for Elasticsearch used in
amundsensearchlibrary/search_service/config.py as an index
:param elasticsearch_doc_type_key: name the ElasticSearch index is prepended with. Defaults to `table` resulting in
`table_{uuid}`
:param model_name: the Databuilder model class used in transporting between Extractor and Loader
:param entity_type: Entity type handed to the `Neo4jSearchDataExtractor` class, used to determine
Cypher query to extract data from Neo4j. Defaults to `table`.
:param elasticsearch_mapping: Elasticsearch field mapping "DDL" handed to the `ElasticsearchPublisher` class,
if None is given (default) it uses the `Table` query baked into the Publisher
"""
# loader saves data to this location and publisher reads it from here
extracted_search_data_path = '/var/tmp/amundsen/search_data.json'
loader = FSElasticsearchJSONLoader()
extractor = NeptuneSearchDataExtractor()
task = DefaultTask(
loader=loader,
extractor=extractor,
transformer=NoopTransformer()
)
# elastic search client instance
elasticsearch_client = es
# unique name of new index in Elasticsearch
elasticsearch_new_index_key = '{}_'.format(elasticsearch_doc_type_key) + str(uuid.uuid4())
publisher = ElasticsearchPublisher()
session = boto3.Session()
aws_creds = session.get_credentials()
aws_access_key = aws_creds.access_key
aws_access_secret = aws_creds.secret_key
aws_token = aws_creds.token
job_config = ConfigFactory.from_dict({
extractor.get_scope(): {
NeptuneSearchDataExtractor.ENTITY_TYPE_CONFIG_KEY: entity_type,
NeptuneSearchDataExtractor.MODEL_CLASS_CONFIG_KEY: model_name,
'neptune.client': {
NeptuneSessionClient.NEPTUNE_HOST_NAME: NEPTUNE_ENDPOINT,
NeptuneSessionClient.AWS_REGION: AWS_REGION,
NeptuneSessionClient.AWS_ACCESS_KEY: aws_access_key,
NeptuneSessionClient.AWS_SECRET_ACCESS_KEY: aws_access_secret,
NeptuneSessionClient.AWS_SESSION_TOKEN: aws_token
}
},
'loader.filesystem.elasticsearch.file_path': extracted_search_data_path,
'loader.filesystem.elasticsearch.mode': 'w',
publisher.get_scope(): {
'file_path': extracted_search_data_path,
'mode': 'r',
'client': elasticsearch_client,
'new_index': elasticsearch_new_index_key,
'doc_type': elasticsearch_doc_type_key,
'alias': elasticsearch_index_alias
}
})
# only optionally add these keys, so need to dynamically `put` them
if elasticsearch_mapping:
job_config.put('publisher.elasticsearch.{}'.format(ElasticsearchPublisher.ELASTICSEARCH_MAPPING_CONFIG_KEY),
elasticsearch_mapping)
job = DefaultJob(
conf=job_config,
task=task,
publisher=ElasticsearchPublisher()
)
return job