def lambda_handler()

in tools/autoingest/lambda/driver/app.py [0:0]


def lambda_handler(event, context):

    logger.debug('## EVENT\r' + jsonpickle.encode(dict(**event)))

    records = event['Records']
    for record in records:
        message = jsonpickle.decode(jsonpickle.decode(record['body'])['Message'])

        logger.info('## MESSAGE\r' + jsonpickle.encode(dict(**message)))

        sourceBucket = message['Records'][0]['s3']['bucket']['name']
        sourceKey = urllib.parse.unquote_plus(message['Records'][0]['s3']['object']['key'])
        sourceVersion = message['Records'][0]['s3']['object']['versionId']

        resultCode = '0'
        resultString = 'Successfully copied'

        try:

            if (sourceBucket != os.environ['SOURCE_BUCKET_NAME']):
                raise ClientError({
                    'Error': {
                        'Code': '400',
                        'Message': 'Unsupported source bucket: {}'.format(sourceBucket)
                    },
                    'ResponseMetadata': {}
                })

            pre_flight_response = s3client.head_object(
                Bucket=sourceBucket,
                Key=sourceKey
            )

            logger.debug('## PREFLIGHT_RESPONSE\r' + jsonpickle.encode(dict(**pre_flight_response)))

            if 'DeleteMarker' in pre_flight_response:
                if  pre_flight_response['pre_flight_response'] == True:
                    raise Exception('Object ' + sourceKey + ' is deleted')

            destinationBucket=os.environ['DESTINATION_BUCKET_NAME']

            unsupportedStorageClass = False

            #Storage class check
            if 'StorageClass' in pre_flight_response:
                if pre_flight_response['StorageClass'] in ['GLACIER', 'DEEP_ARCHIVE']:
                    #check restore status:
                    if 'Restore' in pre_flight_response:
                        restore = pre_flight_response['Restore']
                        logger.debug(restore)
                        if 'ongoing-request="false"' not in restore:
                            logger.info('restore is in progress')
                            raise Exception('Object ' + sourceKey + ' is restoring from '  + pre_flight_response['StorageClass'])
                    else:
                        unsupportedStorageClass = True

                if (unsupportedStorageClass):
                    raise Exception('Object ' + sourceKey + ' is in unsupported StorageClass '  + pre_flight_response['StorageClass'])


            size = pre_flight_response['ContentLength']
            #1 TB
            if (size > 1099511627776):
                logger.warn("the object size is " + size + ". The lambda function may timeout.")

            s3client.copy(CopySource={'Bucket': sourceBucket,'Key': sourceKey, 'VersionId': sourceVersion}, Bucket=destinationBucket, Key='{}/{}'.format(os.environ['DESTINATION_PREFIX'],sourceKey))

        except ClientError as e:
            # If request timed out, mark as a temp failure
            # and S3 Batch Operations will make the task for retry. If
            # any other exceptions are received, mark as permanent failure.
            errorCode = e.response['Error']['Code']
            errorMessage = e.response['Error']['Message']

            logger.debug(errorMessage)

            if errorCode == 'TooManyRequestsException':
                resultCode = 'TemporaryFailure'
                resultString = 'Retry request to batch due to throttling.'
            elif errorCode == 'RequestTimeout':
                resultCode = 'TemporaryFailure'
                resultString = 'Retry request to Amazon S3 due to timeout.'
            elif (errorCode == '304'):
                resultCode = 'Succeeded'
                resultString = 'Not modified'
            elif (errorCode == '400'):
                resultCode = 'Succeeded'
                resultString = errorMessage
            elif (errorCode == 'SlowDown'):
                resultCode = 'TemporaryFailure'
                resultString = 'Retry request to s3 due to throttling.'
            else:
                resultCode = 'PermanentFailure'
                resultString = '{}: {}'.format(errorCode, errorMessage)

            if (resultCode == 'TemporaryFailure'):
                #cooloff anytime between 1-10s. SQS does not support exponential backoff based retry
                logger.info("cooloff..")
                sleep(randint(1,10))
                #retry
                raise

        except Exception as e:
            # Catch all exceptions to permanently fail the task
            resultCode = 'PermanentFailure'
            resultString = 'Exception: {}'.format(e)
            #absorb the error

        finally:
            logger.info(resultCode + " # " + resultString + " # " + sourceKey)