in maestro-timetrigger/src/main/java/com/netflix/maestro/timetrigger/messageprocessors/TimeTriggerExecutionProcessor.java [53:149]
public void process(Supplier<TimeTriggerExecution> messageSupplier) {
long start = System.currentTimeMillis();
metrics.counter(MetricConstants.EXECUTION_PROCESS_METRIC, getClass());
TimeTriggerExecution timeTriggerExecution = messageSupplier.get();
String workflowId = timeTriggerExecution.getWorkflowId();
Checks.checkTrue(
!timeTriggerExecution.getTimeTriggersWithWatermarks().isEmpty(),
"Time triggers must not be empty: " + timeTriggerExecution);
// Compute next executions
List<PlannedTimeTriggerExecution> plannedTimeTriggerExecutions =
executionPlanner.calculatePlannedExecutions(
timeTriggerExecution.getTimeTriggersWithWatermarks(), new Date(), workflowId);
if (!plannedTimeTriggerExecutions.isEmpty()) {
LOG.info("Found target executions to be triggered=[{}]", plannedTimeTriggerExecutions);
}
// trigger a batch of executions
List<PlannedTimeTriggerExecution> executedTimeTriggers = new ArrayList<>();
List<PlannedTimeTriggerExecution> executionBatch =
plannedTimeTriggerExecutions.subList(
0, Math.min(props.getMaxTriggersPerMessage(), plannedTimeTriggerExecutions.size()));
List<List<PlannedTimeTriggerExecution>> partitionedBatch =
partitionList(executionBatch, props.getTriggerBatchSize());
for (List<PlannedTimeTriggerExecution> batch : partitionedBatch) {
boolean terminateCondition = false;
try {
LOG.info("Triggering batch of [{}] executions", batch.size());
List<PlannedTimeTriggerExecution> triggeredExecutions =
triggerPlannedExecutions(timeTriggerExecution, batch);
executedTimeTriggers.addAll(triggeredExecutions);
metrics.counter(
MetricConstants.EXECUTION_TRIGGER_WORKFLOW_METRIC,
getClass(),
MetricConstants.TYPE_TAG,
batch.size() > 1 ? MetricConstants.TAG_VALUE_BATCH : MetricConstants.TAG_VALUE_SINGLE);
} catch (MaestroNotFoundException e) {
terminateCondition = handleMaestroNotFoundException(e, timeTriggerExecution);
} catch (MaestroResourceConflictException e) {
metrics.counter(
MetricConstants.EXECUTION_ERROR_METRIC,
getClass(),
MetricConstants.TYPE_TAG,
MetricConstants.TAG_VALUE_CONFLICT);
LOG.info(
"Trigger UUID Expired for workflow [{}][{}] triggerUUID=[{}]",
timeTriggerExecution.getWorkflowId(),
timeTriggerExecution.getWorkflowVersion(),
timeTriggerExecution.getWorkflowTriggerUuid(),
e);
terminateCondition = true;
} catch (MaestroUnprocessableEntityException e) {
terminateCondition = handleMaestroUnprocessableEntityException(e, timeTriggerExecution);
}
if (terminateCondition) {
metrics.counter(MetricConstants.TERMINATE_SUBSCRIPTION_METRIC, getClass());
LOG.info(
"TimeTrigger Subscription terminating for [{}][{}][{}]",
timeTriggerExecution.getWorkflowId(),
timeTriggerExecution.getWorkflowVersion(),
timeTriggerExecution.getWorkflowTriggerUuid());
return;
}
}
// calculate next execution from previously executed batches
TimeTriggerExecution nextExecution =
executionPlanner.constructNextExecution(timeTriggerExecution, executedTimeTriggers);
// Calculate earliest next execution for invisibility duration
Optional<Date> firstExecutionDate =
executionPlanner.calculateEarliestExecutionDate(
nextExecution.getTimeTriggersWithWatermarks(), workflowId);
if (firstExecutionDate.isPresent()) {
long duration = calculateMessageDelay(firstExecutionDate.get());
LOG.info(
"Pushing next execution with invisibility duration of [{}] seconds for [{}] with next execution date=[{}]",
duration,
nextExecution,
firstExecutionDate);
timeTriggerProducer.push(nextExecution, (int) duration);
} else {
// No next executions left, can terminate subscription
metrics.counter(MetricConstants.TERMINATE_SUBSCRIPTION_METRIC, getClass());
LOG.info(
"timeTriggerExecution terminating for workflow [{}][{}] triggerUUID=[{}] due to empty next execution",
timeTriggerExecution.getWorkflowId(),
timeTriggerExecution.getWorkflowVersion(),
timeTriggerExecution.getWorkflowTriggerUuid());
}
LOG.info(
"Finished processing timeTriggerExecution [{}] and spent [{}] ms",
timeTriggerExecution,
System.currentTimeMillis() - start);
}