in infrastructure/emr_trigger/lambda_source/trigger.py [0:0]
def reset_batch(table, latest, pipeline_arn, execution_arn, cluster_name):
"""
When a batch run is triggered, reset the LATEST item to start collecting files for the next batch run.
Also add batch metadata to DynamoDB for the batch run just triggered
:param table:
:param latest:
:param pipeline_arn:
:param execution_arn:
:param cluster_name:
:return:
"""
table.update_item(
Key={
"BatchId": "LATEST",
"Name": "LATEST",
},
UpdateExpression="set FileSizeKb = :f, NumFiles = :n, BatchWindowStartTime = :t",
ExpressionAttributeValues={
":f": 0,
":n": 0,
":t": int(datetime.datetime.now().timestamp()),
},
)
table.put_item(
Item={
"BatchId": "BatchMetadata",
"Name": str(latest["BatchWindowStartTime"]),
"FileSizeKb": latest["FileSizeKb"],
"NumFiles": latest["NumFiles"],
"BatchWindowStartTime": latest["BatchWindowStartTime"],
"BatchWindowEndTime": int(datetime.datetime.now().timestamp()),
"PipelineArn": pipeline_arn,
"ExecutionArn": execution_arn,
"ClusterName": cluster_name,
}
)