def trigger_pipeline()

in infrastructure/emr_trigger/lambda_source/trigger.py [0:0]


def trigger_pipeline(current_batch_id, pipeline_arn, cluster_name):
    """
    Triggers pipeline if there is no running execution for the given batch id
    :param current_batch_id:
    :return:
    """

    pipeline_input = json.dumps(
        {
            "ClusterConfigurationOverrides": {
                "ClusterName": cluster_name,
            },
            "StepArgumentOverrides": {
                "Synchronize Topics - PySpark Job": {
                    "DynamoDB.BatchId": current_batch_id
                },
                "Scene Detection - PySpark Job": {"DynamoDB.BatchId": current_batch_id},
            },
            "BatchId": current_batch_id,
        }
    )

    if is_safe_to_run_new_execution(pipeline_arn, current_batch_id):
        execution = sfn.start_execution(
            stateMachineArn=pipeline_arn,
            input=pipeline_input,
            name=f"BatchId_{current_batch_id}",
        )
        logger.info(f"Started StateMachine {pipeline_arn} with input {pipeline_input}")
        return execution
    else:
        logger.info(
            f"Batch already started for {pipeline_arn} with input {pipeline_input}"
        )
        return None