in infrastructure/emr_trigger/lambda_source/trigger.py [0:0]
def trigger_pipeline(current_batch_id, pipeline_arn, cluster_name):
"""
Triggers pipeline if there is no running execution for the given batch id
:param current_batch_id:
:return:
"""
pipeline_input = json.dumps(
{
"ClusterConfigurationOverrides": {
"ClusterName": cluster_name,
},
"StepArgumentOverrides": {
"Synchronize Topics - PySpark Job": {
"DynamoDB.BatchId": current_batch_id
},
"Scene Detection - PySpark Job": {"DynamoDB.BatchId": current_batch_id},
},
"BatchId": current_batch_id,
}
)
if is_safe_to_run_new_execution(pipeline_arn, current_batch_id):
execution = sfn.start_execution(
stateMachineArn=pipeline_arn,
input=pipeline_input,
name=f"BatchId_{current_batch_id}",
)
logger.info(f"Started StateMachine {pipeline_arn} with input {pipeline_input}")
return execution
else:
logger.info(
f"Batch already started for {pipeline_arn} with input {pipeline_input}"
)
return None