in maestro-signal/src/main/java/com/netflix/maestro/signal/messageprocessors/SignalTriggerExecutionProcessor.java [58:165]
public void process(Supplier<SignalTriggerExecution> messageSupplier) {
SignalTriggerExecution execution = messageSupplier.get();
String workflowId = execution.getWorkflowId();
String triggerUuid = execution.getTriggerUuid();
Map<String, Parameter> evaluatedParams = new LinkedHashMap<>();
if (execution.getParams() != null) {
for (var entry : execution.getParams().entrySet()) {
Parameter evaluated =
paramEvaluator.parseAttribute(
entry.getValue(), Collections.emptyMap(), execution.getWorkflowId(), true);
if (evaluated == null) {
LOG.info(
"Failed to evaluate the run param [{}] for workflow trigger [{}][{}] and skip the execution",
entry.getValue(),
workflowId,
triggerUuid);
metrics.counter(
MetricConstants.SIGNAL_TRIGGER_EXECUTION_FAILURE,
getClass(),
MetricConstants.TYPE_TAG,
"param_eval_error");
return;
}
evaluatedParams.put(entry.getKey(), evaluated);
}
}
String conditionExpr = execution.getCondition();
if (conditionExpr != null && !conditionExpr.isEmpty()) {
Parameter evaluated =
paramEvaluator.parseAttribute(
BooleanParamDefinition.builder()
.name("maestro_signal_condition")
.expression(conditionExpr)
.build(),
Collections.emptyMap(),
execution.getWorkflowId(),
true);
if (evaluated == null) {
LOG.info(
"Failed to evaluate the condition [{}] for workflow signal trigger [{}][{}] and skip the execution",
conditionExpr,
workflowId,
triggerUuid);
metrics.counter(
MetricConstants.SIGNAL_TRIGGER_EXECUTION_FAILURE,
getClass(),
MetricConstants.TYPE_TAG,
"condition_eval_error");
return;
}
if (!evaluated.asBoolean()) {
LOG.info(
"The condition [{}] for workflow signal trigger [{}][{}] is evaluated as FALSE and skip the execution",
conditionExpr,
workflowId,
triggerUuid);
metrics.counter(
MetricConstants.SIGNAL_TRIGGER_EXECUTION_FAILURE,
getClass(),
MetricConstants.TYPE_TAG,
"false_condition");
return;
}
}
try {
execution.setParams(null);
UUID requestId = deriveRequestId(execution);
RunResponse resp =
actionHandler.start(
execution.getWorkflowId(),
Constants.WorkflowVersion.ACTIVE.name(),
createWorkflowRunRequest(requestId, execution, evaluatedParams));
LOG.info("signal trigger workflow run response is [{}]", resp);
metrics.counter(MetricConstants.SIGNAL_TRIGGER_EXECUTION_SUCCESS, getClass());
} catch (MaestroResourceConflictException ce) {
metrics.counter(
MetricConstants.SIGNAL_TRIGGER_EXECUTION_FAILURE,
getClass(),
MetricConstants.TYPE_TAG,
"conflict");
LOG.info(
"Trigger uuid [{}] for workflow [{}] has been changed, deleting workflow trigger due to [{}]",
triggerUuid,
workflowId,
ce.getMessage());
brokerDao.deleteTrigger(workflowId, triggerUuid);
} catch (MaestroNotFoundException ne) {
handleMaestroNotFoundException(ne, workflowId, triggerUuid);
brokerDao.deleteTrigger(workflowId, triggerUuid);
} catch (MaestroUnprocessableEntityException e) {
handleMaestroUnProcessableEntityException(e, workflowId, triggerUuid);
brokerDao.deleteTrigger(workflowId, triggerUuid);
} catch (RuntimeException re) {
LOG.warn(
"recoverable workflow trigger exception for [{}][{}] due to",
workflowId,
triggerUuid,
re);
metrics.counter(
MetricConstants.SIGNAL_TRIGGER_EXECUTION_FAILURE,
getClass(),
MetricConstants.TYPE_TAG,
"recoverable");
throw new MaestroRetryableError(
re, "recoverable workflow trigger exception for [%s][%s]", workflowId, triggerUuid);
}
}