def lambda_handler()

in tools/mediasync/lambda/driver/app.py [0:0]


def lambda_handler(event, context):

    logger.debug('## EVENT\r' + jsonpickle.encode(dict(**event)))

    jobId = event['job']['id']
    invocationId = event['invocationId']
    invocationSchemaVersion = event['invocationSchemaVersion']

    taskId = event['tasks'][0]['taskId']
    sourceKey = urllib.parse.unquote_plus(event['tasks'][0]['s3Key'])
    s3BucketArn = event['tasks'][0]['s3BucketArn']
    sourceBucket = s3BucketArn.split(':::')[-1]

    results = []
    # Prepare result code and string
    resultCode = None
    resultString = None

    minsizeforbatch = int(os.environ['MN_SIZE_FOR_BATCH_IN_BYTES'])

    # Copy object to new bucket with new key name
    try:
        logger.debug("preflight check start")

        #preflight checks _read_
        pre_flight_response = s3client.head_object(
            Bucket=sourceBucket,
            Key=sourceKey
        )

        logger.debug('## PREFLIGHT_RESPONSE\r' + jsonpickle.encode(dict(**pre_flight_response)))

        if 'DeleteMarker' in pre_flight_response:
            if  pre_flight_response['pre_flight_response'] == True:
                raise Exception('Object ' + sourceKey + ' is deleted')

        size = pre_flight_response['ContentLength']
        destinationBucket=os.environ['DESTINATION_BUCKET_NAME']

        logger.debug("preflight check end")

        if (size > minsizeforbatch):

            unsupportedStorageClass = False

            #Storage class check
            if 'StorageClass' in pre_flight_response:
                if pre_flight_response['StorageClass'] in ['GLACIER', 'DEEP_ARCHIVE']:
                    #check restore status:
                    if 'Restore' in pre_flight_response:
                        restore = pre_flight_response['Restore']
                        logger.debug(restore)
                        if 'ongoing-request="false"' not in restore:
                            logger.info('restore is in progress')
                            raise Exception('Object ' + sourceKey + ' is restoring from '  + pre_flight_response['StorageClass'])
                    else:
                        unsupportedStorageClass = True

            if (unsupportedStorageClass):
                raise Exception('Object ' + sourceKey + ' is in unsupported StorageClass '  + pre_flight_response['StorageClass'])

            #NFC for unicodedata
            if unicodedata.is_normalized('NFC', sourceKey) == False:
                raise Exception('Object ' + sourceKey + ' is not in Normalized Form C' )


            if (is_can_submit_jobs() == False):

                logger.info("too many jobs pending. returning slowdown")
                resultCode = 'TemporaryFailure'
                resultString = 'Retry request to batch due to too many pending jobs.'

            else:

                logger.debug("job submission start")

                #submit job
                response = batchclient.submit_job(
                    jobName="MediaSyncJob",
                    jobQueue=os.environ['JOB_QUEUE'],
                    jobDefinition=os.environ['JOB_DEFINITION'],
                    parameters={
                        'SourceS3Uri': 's3://' + sourceBucket + '/' + sourceKey,
                        'DestinationS3Uri': 's3://' + destinationBucket + '/' + sourceKey,
                        'Size': str(size)
                    },
                    tags={
                        'S3BatchJobId': jobId,
                        'SourceBucket': sourceBucket,
                        'DestinationBucket': destinationBucket,
                        'Key': sourceKey,
                        'Size': str(size)
                    }
                )

                logger.debug('## BATCH_RESPONSE\r' + jsonpickle.encode(dict(**pre_flight_response)))
                logger.debug("job submission complete")
                resultCode = 'Succeeded'

                detail = 'https://console.aws.amazon.com/batch/v2/home?region=' + os.environ['AWS_REGION'] + '#jobs/detail/'+ response['jobId']
                resultString = detail
                resultCode = 'Succeeded'

        else:
            # <5GB
            copy_response= {}

            if (os.environ['IS_BUCKET_OWNER_FULL_CONTROL'] == 'FALSE'):
                copy_response = s3client.copy_object(
                    Bucket=destinationBucket,
                    CopySource={'Bucket': sourceBucket,'Key': sourceKey},
                    Key=sourceKey
                )
            else:
                copy_response = s3client.copy_object(
                    Bucket=destinationBucket,
                    CopySource={'Bucket': sourceBucket,'Key': sourceKey},
                    ACL='bucket-owner-full-control',
                    Key=sourceKey
                )

            logger.debug('## COPY_RESPONSE\r' + jsonpickle.encode(dict(**copy_response)))
            resultString = 'Lambda copy complete'
            resultCode = 'Succeeded'


    except ClientError as e:
        # If request timed out, mark as a temp failure
        # and S3 Batch Operations will make the task for retry. If
        # any other exceptions are received, mark as permanent failure.
        errorCode = e.response['Error']['Code']
        errorMessage = e.response['Error']['Message']

        logger.debug(errorMessage)

        if errorCode == 'TooManyRequestsException':
            resultCode = 'TemporaryFailure'
            resultString = 'Retry request to batch due to throttling.'
        elif errorCode == 'RequestTimeout':
            resultCode = 'TemporaryFailure'
            resultString = 'Retry request to Amazon S3 due to timeout.'
        elif (errorCode == '304'):
            resultCode = 'Succeeded'
            resultString = 'Not modified'
        elif (errorCode == 'SlowDown'):
            resultCode = 'TemporaryFailure'
            resultString = 'Retry request to s3 due to throttling.'
        else:
            resultCode = 'PermanentFailure'
            resultString = '{}: {}'.format(errorCode, errorMessage)

    except Exception as e:
        # Catch all exceptions to permanently fail the task
        resultCode = 'PermanentFailure'
        resultString = 'Exception: {}'.format(e)

    finally:
        results.append({
            'taskId': taskId,
            'resultCode': resultCode,
            'resultString': resultString
        })
        logger.info(resultCode + " # " + resultString)

    return {
        'invocationSchemaVersion': invocationSchemaVersion,
        'treatMissingKeysAs': 'PermanentFailure',
        'invocationId': invocationId,
        'results': results
    }