def lambda_handler()

in source/repositories/compliant-framework-central-pipeline/lambda/expand_s3_sources/index.py [0:0]


def lambda_handler(event, context):

    cp_client = boto3.client('codepipeline')
    s3_client = boto3.client('s3')

    try:
        print(json.dumps(event))

        # Extract the Job ID
        job_id = event['CodePipeline.job']['id']

        # Extract the Job Data
        job_data = event['CodePipeline.job']['data']

        params = json.loads(
            job_data['actionConfiguration']['configuration']['UserParameters'])
        print(json.dumps(params))

        tmp_file = tempfile.NamedTemporaryFile()
        bucket_name = job_data['inputArtifacts'][0]['location']['s3Location']['bucketName']
        object_key = job_data['inputArtifacts'][0]['location']['s3Location']['objectKey']

        print(f'bucket_name {bucket_name}')

        print(f'object_key {object_key}')

        repository_name = params['repositoryName']

        bucket = boto3.resource('s3').Bucket(params['bucketName'])
        bucket.objects.filter(Prefix=f'{repository_name}/').delete()
        branch_name = params['branchName']

        with tempfile.NamedTemporaryFile() as tmp_file:
            s3_client.download_file(bucket_name, object_key, tmp_file.name)

            with zipfile.ZipFile(tmp_file.name) as zip:
                for path in zip.namelist():
                    content = zip.read(path)
                    if(len(content)):
                        # we have to guess the mime content-type of the files and
                        # provide it to S3 since S3 cannot do this on its own.
                        content_type = mimetypes.guess_type(path)[0]
                        if content_type is not None:
                            bucket.put_object(
                                Body=(content),
                                Key=f'{repository_name}/{path}',
                                ContentType=content_type,
                                ServerSideEncryption='aws:kms',
                                SSEKMSKeyId=params['kmsKeyId'])
                        else:
                            bucket.put_object(
                                Body=(content),
                                Key=f'{repository_name}/{path}',
                                ServerSideEncryption='aws:kms',
                                SSEKMSKeyId=params['kmsKeyId'])

        cp_client.put_job_success_result(jobId=job_id)

    except Exception as e:
        # If any other exceptions which we didn't expect are raised
        # then fail the job and log the exception message.
        print('Function failed due to exception.')
        print(e)
        cp_client.put_job_failure_result(
            jobId=job_id, failureDetails={
                'message': e,
                'type': 'JobFailed'
            }
        )