public int tryExecuteTrigger()

in maestro-signal/src/main/java/com/netflix/maestro/signal/dao/MaestroSignalBrokerDao.java [235:304]


  public int tryExecuteTrigger(SignalTriggerMatch match) {
    return withMetricLogError(
        () ->
            withRetryableTransaction(
                conn -> {
                  SignalInstance instance = match.getSignalInstance();
                  SignalTriggerDto triggerDto =
                      triggerDao.getTriggerForUpdate(
                          conn, match.getWorkflowId(), match.getTriggerUuid());
                  if (triggerDto == null || triggerDto.definition() == null) {
                    return -1; // not found valid trigger definition
                  }
                  for (int i = 0; i < triggerDto.signals().length; ++i) {
                    if (instance.getName().equals(triggerDto.signals()[i])) {
                      if (triggerDto.checkpoints()[i] >= instance.getSeqId()) {
                        return 0; // instance is no longer valid to match
                      }
                    }
                  }
                  SignalTrigger trigger = fromJson(triggerDto.definition(), SignalTrigger.class);
                  if (!isMatched(trigger, instance)) {
                    return 0;
                  }
                  var baseEntry = trigger.getDefinitions().get(instance.getName());
                  if (baseEntry == null) {
                    return 0; // instance is no longer valid to match
                  }
                  List<SignalParamValue> joinValues = new ArrayList<>();
                  if (baseEntry.getJoinKeys() != null) { // OK to be null, then no join values added
                    for (String k : baseEntry.getJoinKeys()) {
                      SignalParamValue joinValue = instance.getParams().get(k);
                      if (joinValue == null) {
                        return 0; // unmatched
                      }
                      joinValues.add(joinValue);
                    }
                  }

                  Long[] matchedIds = new Long[triggerDto.signals().length];
                  for (int i = 0; i < triggerDto.signals().length; ++i) {
                    if (instance.getName().equals(triggerDto.signals()[i])) {
                      matchedIds[i] = instance.getSeqId();
                    } else {
                      var signalMatch = buildMatch(joinValues, trigger, triggerDto.signals()[i]);
                      if (signalMatch == null) {
                        return 0; // unmatched, no execution
                      }
                      Long seqId =
                          matchSignalForTrigger(conn, signalMatch, triggerDto.checkpoints()[i]);
                      if (seqId == null) {
                        return 0; // unmatched, non execution
                      }
                      matchedIds[i] = seqId;
                    }
                  }
                  boolean updated =
                      triggerDao.updateTriggerCheckpoints(
                          conn, triggerDto.workflowId(), triggerDto.triggerUuid(), matchedIds);
                  if (!updated) {
                    throw new MaestroRetryableError(
                        "Failed to update the checkpoint for [%s], please try it again", match);
                  }
                  queueProducer.push(
                      buildExecution(match, triggerDto.signals(), matchedIds, trigger));
                  return 1;
                }),
        "tryExecuteTrigger",
        "Failed to process a trigger execution: [{}]",
        match);
  }