in src/bulk-loader/inventory-created-handler/handler.py [0:0]
def process_message_record(record:dict)->dict:
'''
Process an individual notification record.
:param record: A single notification within the ObjectCreatedNotification message.
:rtype: The response from the s3control.create_job operation.
'''
references:List[S3ObjectRef] = []
parser = create_manifest_parser(record)
if parser.manifest_object_key.startswith('reports/'):
logger.info(
'Skipping process_message_record for reports manifest - {} '.format(
parser.manifest_object_key))
return None
for file in parser.manifest_files:
for reference in parser.fetch_inventory_entries(file):
if should_process(reference):
references.append(reference)
'''
Write the inventory list for processing.
'''
inventory_report_name = parser.get_inventory_report_name()
object_key ='input/{}.csv'.format(inventory_report_name)
putObjectResponse = parser.write_reference_list(references, object_key=object_key)
'''
Run the batch job.
'''
createJobResponse = s3control.create_job(
AccountId=ACCOUNT_ID,
ConfirmationRequired=False,
Operation={
'LambdaInvoke':{
'FunctionArn': BATCH_FUNCTION_ARN
}
},
Report= {
'Bucket': "arn:aws:s3:::{}".format(parser.inventory_bucket_name),
'Prefix': 'reports',
"Format": "Report_CSV_20180820",
"Enabled": True,
"ReportScope": "AllTasks"
},
ClientRequestToken=inventory_report_name,
Manifest= {
"Spec": {
"Format": "S3BatchOperations_CSV_20180820",
"Fields": [
"Bucket",
"Key"
]
},
"Location": {
'ObjectArn':"arn:aws:s3:::riv-simple-sharedstorageimagesinventorybucketab8f-qlx62dgkgypf/input/Full-InventoryReport/2021-08-16T01-00Z.csv",
'ETag': putObjectResponse['ETag'].strip('"'),
}
},
Description='RivSimple Import',
Priority=123,
RoleArn=BATCH_ROLE_ARN,
Tags=[
{'Key':'riv_stack', 'Value': RIV_STACK_NAME},
{'Key':'inventory_bucket', 'Value': parser.inventory_bucket_name},
{'Key':'content_bucket', 'Value': references[0].bucket_name},
])
return createJobResponse