in maestro-signal/src/main/java/com/netflix/maestro/signal/dao/MaestroSignalBrokerDao.java [235:304]
public int tryExecuteTrigger(SignalTriggerMatch match) {
return withMetricLogError(
() ->
withRetryableTransaction(
conn -> {
SignalInstance instance = match.getSignalInstance();
SignalTriggerDto triggerDto =
triggerDao.getTriggerForUpdate(
conn, match.getWorkflowId(), match.getTriggerUuid());
if (triggerDto == null || triggerDto.definition() == null) {
return -1; // not found valid trigger definition
}
for (int i = 0; i < triggerDto.signals().length; ++i) {
if (instance.getName().equals(triggerDto.signals()[i])) {
if (triggerDto.checkpoints()[i] >= instance.getSeqId()) {
return 0; // instance is no longer valid to match
}
}
}
SignalTrigger trigger = fromJson(triggerDto.definition(), SignalTrigger.class);
if (!isMatched(trigger, instance)) {
return 0;
}
var baseEntry = trigger.getDefinitions().get(instance.getName());
if (baseEntry == null) {
return 0; // instance is no longer valid to match
}
List<SignalParamValue> joinValues = new ArrayList<>();
if (baseEntry.getJoinKeys() != null) { // OK to be null, then no join values added
for (String k : baseEntry.getJoinKeys()) {
SignalParamValue joinValue = instance.getParams().get(k);
if (joinValue == null) {
return 0; // unmatched
}
joinValues.add(joinValue);
}
}
Long[] matchedIds = new Long[triggerDto.signals().length];
for (int i = 0; i < triggerDto.signals().length; ++i) {
if (instance.getName().equals(triggerDto.signals()[i])) {
matchedIds[i] = instance.getSeqId();
} else {
var signalMatch = buildMatch(joinValues, trigger, triggerDto.signals()[i]);
if (signalMatch == null) {
return 0; // unmatched, no execution
}
Long seqId =
matchSignalForTrigger(conn, signalMatch, triggerDto.checkpoints()[i]);
if (seqId == null) {
return 0; // unmatched, non execution
}
matchedIds[i] = seqId;
}
}
boolean updated =
triggerDao.updateTriggerCheckpoints(
conn, triggerDto.workflowId(), triggerDto.triggerUuid(), matchedIds);
if (!updated) {
throw new MaestroRetryableError(
"Failed to update the checkpoint for [%s], please try it again", match);
}
queueProducer.push(
buildExecution(match, triggerDto.signals(), matchedIds, trigger));
return 1;
}),
"tryExecuteTrigger",
"Failed to process a trigger execution: [{}]",
match);
}