public void process()

in maestro-signal/src/main/java/com/netflix/maestro/signal/messageprocessors/SignalTriggerExecutionProcessor.java [58:165]


  public void process(Supplier<SignalTriggerExecution> messageSupplier) {
    SignalTriggerExecution execution = messageSupplier.get();
    String workflowId = execution.getWorkflowId();
    String triggerUuid = execution.getTriggerUuid();
    Map<String, Parameter> evaluatedParams = new LinkedHashMap<>();
    if (execution.getParams() != null) {
      for (var entry : execution.getParams().entrySet()) {
        Parameter evaluated =
            paramEvaluator.parseAttribute(
                entry.getValue(), Collections.emptyMap(), execution.getWorkflowId(), true);
        if (evaluated == null) {
          LOG.info(
              "Failed to evaluate the run param [{}] for workflow trigger [{}][{}] and skip the execution",
              entry.getValue(),
              workflowId,
              triggerUuid);
          metrics.counter(
              MetricConstants.SIGNAL_TRIGGER_EXECUTION_FAILURE,
              getClass(),
              MetricConstants.TYPE_TAG,
              "param_eval_error");
          return;
        }
        evaluatedParams.put(entry.getKey(), evaluated);
      }
    }
    String conditionExpr = execution.getCondition();
    if (conditionExpr != null && !conditionExpr.isEmpty()) {
      Parameter evaluated =
          paramEvaluator.parseAttribute(
              BooleanParamDefinition.builder()
                  .name("maestro_signal_condition")
                  .expression(conditionExpr)
                  .build(),
              Collections.emptyMap(),
              execution.getWorkflowId(),
              true);
      if (evaluated == null) {
        LOG.info(
            "Failed to evaluate the condition [{}] for workflow signal trigger [{}][{}] and skip the execution",
            conditionExpr,
            workflowId,
            triggerUuid);
        metrics.counter(
            MetricConstants.SIGNAL_TRIGGER_EXECUTION_FAILURE,
            getClass(),
            MetricConstants.TYPE_TAG,
            "condition_eval_error");
        return;
      }
      if (!evaluated.asBoolean()) {
        LOG.info(
            "The condition [{}] for workflow signal trigger [{}][{}] is evaluated as FALSE and skip the execution",
            conditionExpr,
            workflowId,
            triggerUuid);
        metrics.counter(
            MetricConstants.SIGNAL_TRIGGER_EXECUTION_FAILURE,
            getClass(),
            MetricConstants.TYPE_TAG,
            "false_condition");
        return;
      }
    }

    try {
      execution.setParams(null);
      UUID requestId = deriveRequestId(execution);
      RunResponse resp =
          actionHandler.start(
              execution.getWorkflowId(),
              Constants.WorkflowVersion.ACTIVE.name(),
              createWorkflowRunRequest(requestId, execution, evaluatedParams));
      LOG.info("signal trigger workflow run response is [{}]", resp);
      metrics.counter(MetricConstants.SIGNAL_TRIGGER_EXECUTION_SUCCESS, getClass());
    } catch (MaestroResourceConflictException ce) {
      metrics.counter(
          MetricConstants.SIGNAL_TRIGGER_EXECUTION_FAILURE,
          getClass(),
          MetricConstants.TYPE_TAG,
          "conflict");
      LOG.info(
          "Trigger uuid [{}] for workflow [{}] has been changed, deleting workflow trigger due to [{}]",
          triggerUuid,
          workflowId,
          ce.getMessage());
      brokerDao.deleteTrigger(workflowId, triggerUuid);
    } catch (MaestroNotFoundException ne) {
      handleMaestroNotFoundException(ne, workflowId, triggerUuid);
      brokerDao.deleteTrigger(workflowId, triggerUuid);
    } catch (MaestroUnprocessableEntityException e) {
      handleMaestroUnProcessableEntityException(e, workflowId, triggerUuid);
      brokerDao.deleteTrigger(workflowId, triggerUuid);
    } catch (RuntimeException re) {
      LOG.warn(
          "recoverable workflow trigger exception for [{}][{}] due to",
          workflowId,
          triggerUuid,
          re);
      metrics.counter(
          MetricConstants.SIGNAL_TRIGGER_EXECUTION_FAILURE,
          getClass(),
          MetricConstants.TYPE_TAG,
          "recoverable");
      throw new MaestroRetryableError(
          re, "recoverable workflow trigger exception for [%s][%s]", workflowId, triggerUuid);
    }
  }