def trigger_neptune_export()

in export-neptune-to-elasticsearch/lambda/export_neptune_to_kinesis.py [0:0]


def trigger_neptune_export():

    neptune_export_jar_uri = os.environ['NEPTUNE_EXPORT_JAR_URI']
    neptune_endpoint = os.environ['NEPTUNE_ENDPOINT']
    neptune_port = os.environ['NEPTUNE_PORT']
    neptune_engine = os.environ['NEPTUNE_ENGINE']
    stream_name = os.environ['STREAM_NAME']
    job_suffix = os.environ['JOB_SUFFIX']
    region = os.environ['AWS_REGION']
    concurrency = os.environ['CONCURRENCY']
    scope = os.environ['EXPORT_SCOPE']
    additional_params = os.environ['ADDITIONAL_PARAMS']
    clone_cluster = os.environ.get('CLONE_CLUSTER')

    if additional_params:
        additional_params = additional_params if additional_params.startswith(' ') else ' {}'.format(additional_params)
    else:
        additional_params = ''
        
    use_iam_auth = '' if neptune_engine == 'sparql' else ' --use-iam-auth' 
    export_command = 'export-pg' if neptune_engine == 'gremlin' else 'export-rdf'
    concurrency_param = ' --concurrency {}'.format(concurrency) if neptune_engine == 'gremlin' else ''
    scope_param = ' --scope {}'.format(scope) if neptune_engine == 'gremlin' else ''
    clone_cluster_param = ' --clone-cluster' if clone_cluster and clone_cluster.lower() == 'true' else ''
            
    command = 'df -h && rm -rf neptune-export.jar && wget {} -nv && export SERVICE_REGION="{}" && java -Xms16g -Xmx16g -jar neptune-export.jar {} -e {} -p {} -d /neptune/results --output stream --stream-name {} --region {} --format neptuneStreamsJson --use-ssl{}{}{}{}{}'.format(
        neptune_export_jar_uri, 
        region,
        export_command, 
        neptune_endpoint, 
        neptune_port,
        stream_name, 
        region,
        use_iam_auth,
        concurrency_param,
        scope_param,
        clone_cluster_param,
        additional_params)
        
    logger.info('Command: {}'.format(command))
    
    submit_job_response = client.submit_job(
        jobName='export-neptune-to-kinesis-{}-{}'.format(job_suffix, round(datetime.utcnow().timestamp() * 1000)),
        jobQueue='export-neptune-to-kinesis-queue-{}'.format(job_suffix),
        jobDefinition='export-neptune-to-kinesis-job-{}'.format(job_suffix),
        containerOverrides={
            'command': [
                'sh',
                '-c',
                command
            ]
        }
    )
    
    return submit_job_response