def handler()

in subscribers/python/tf-auto-export-to-s3/index.py [0:0]


def handler(event, context):
    dataexchange = boto3.client(
        service_name='dataexchange',
        region_name=region
    )
    s3 = boto3.client(
        service_name='s3',
        region_name=region
    )
    #If the request is from Terraform get the RevisionID, for first revision
    if 'InitialInit' in event:
        data_set_id = event['InitialInit']['data_set_id']
        revision_ids = [event['InitialInit']['RevisionIds']]
        print ("Initial revision retrieval")
        print (event)
    else:
        data_set_id = event['resources'][0]
        revision_ids = event['detail']['RevisionIds']
        print ("Triggered revision retrieval")
        print (event)
    # Used to store the Ids of the Jobs exporting the assets to S3.
    job_ids = set()
    
    # Create an ExportRevisionToS3 Job for each Revision ID
    for revision_id in revision_ids:

        export_job = dataexchange.create_job(
            Type='EXPORT_REVISIONS_TO_S3',
            Details={
                'ExportRevisionsToS3': {
                    'DataSetId': data_set_id,
                    'RevisionDestinations': [
                        { 
                            'Bucket': destination_bucket, 
                            'KeyPattern': data_set_id+'/${Revision.Id}/${Asset.Name}',
                            'RevisionId': revision_id
                        }
                    ]
                }
            }    
        )
        # Start the Job and save the JobId.
        dataexchange.start_job(JobId=export_job['Id'])
        job_ids.add(export_job['Id'])
    
    # Iterate until all remaining workflow have reached a terminal state, or an error is found.
    completed_jobs = set()
    while job_ids != completed_jobs:
        for job_id in job_ids:
            if job_id in completed_jobs:
                continue
            get_job_response = dataexchange.get_job(JobId=job_id)
            if get_job_response['State'] == 'COMPLETED':
                print ("Job {} completed".format(job_id))
                completed_jobs.add(job_id)
            if get_job_response['State'] == 'ERROR':
                job_errors = get_job_response['Errors']
                raise Exception('JobId: {} failed with errors:\n{}'.format(job_id, job_errors))
            # Sleep to ensure we don't get throttled by the GetJob API.
            time.sleep(0.2)
    return {
        'statusCode': 200,
        'body': json.dumps('All jobs completed.')
    }