public void process()

in maestro-timetrigger/src/main/java/com/netflix/maestro/timetrigger/messageprocessors/TimeTriggerExecutionProcessor.java [53:149]


  public void process(Supplier<TimeTriggerExecution> messageSupplier) {
    long start = System.currentTimeMillis();
    metrics.counter(MetricConstants.EXECUTION_PROCESS_METRIC, getClass());

    TimeTriggerExecution timeTriggerExecution = messageSupplier.get();
    String workflowId = timeTriggerExecution.getWorkflowId();
    Checks.checkTrue(
        !timeTriggerExecution.getTimeTriggersWithWatermarks().isEmpty(),
        "Time triggers must not be empty: " + timeTriggerExecution);

    // Compute next executions
    List<PlannedTimeTriggerExecution> plannedTimeTriggerExecutions =
        executionPlanner.calculatePlannedExecutions(
            timeTriggerExecution.getTimeTriggersWithWatermarks(), new Date(), workflowId);

    if (!plannedTimeTriggerExecutions.isEmpty()) {
      LOG.info("Found target executions to be triggered=[{}]", plannedTimeTriggerExecutions);
    }
    // trigger a batch of executions
    List<PlannedTimeTriggerExecution> executedTimeTriggers = new ArrayList<>();
    List<PlannedTimeTriggerExecution> executionBatch =
        plannedTimeTriggerExecutions.subList(
            0, Math.min(props.getMaxTriggersPerMessage(), plannedTimeTriggerExecutions.size()));
    List<List<PlannedTimeTriggerExecution>> partitionedBatch =
        partitionList(executionBatch, props.getTriggerBatchSize());

    for (List<PlannedTimeTriggerExecution> batch : partitionedBatch) {
      boolean terminateCondition = false;
      try {
        LOG.info("Triggering batch of [{}] executions", batch.size());
        List<PlannedTimeTriggerExecution> triggeredExecutions =
            triggerPlannedExecutions(timeTriggerExecution, batch);
        executedTimeTriggers.addAll(triggeredExecutions);
        metrics.counter(
            MetricConstants.EXECUTION_TRIGGER_WORKFLOW_METRIC,
            getClass(),
            MetricConstants.TYPE_TAG,
            batch.size() > 1 ? MetricConstants.TAG_VALUE_BATCH : MetricConstants.TAG_VALUE_SINGLE);
      } catch (MaestroNotFoundException e) {
        terminateCondition = handleMaestroNotFoundException(e, timeTriggerExecution);
      } catch (MaestroResourceConflictException e) {
        metrics.counter(
            MetricConstants.EXECUTION_ERROR_METRIC,
            getClass(),
            MetricConstants.TYPE_TAG,
            MetricConstants.TAG_VALUE_CONFLICT);
        LOG.info(
            "Trigger UUID Expired for workflow [{}][{}] triggerUUID=[{}]",
            timeTriggerExecution.getWorkflowId(),
            timeTriggerExecution.getWorkflowVersion(),
            timeTriggerExecution.getWorkflowTriggerUuid(),
            e);
        terminateCondition = true;
      } catch (MaestroUnprocessableEntityException e) {
        terminateCondition = handleMaestroUnprocessableEntityException(e, timeTriggerExecution);
      }
      if (terminateCondition) {
        metrics.counter(MetricConstants.TERMINATE_SUBSCRIPTION_METRIC, getClass());
        LOG.info(
            "TimeTrigger Subscription terminating for [{}][{}][{}]",
            timeTriggerExecution.getWorkflowId(),
            timeTriggerExecution.getWorkflowVersion(),
            timeTriggerExecution.getWorkflowTriggerUuid());
        return;
      }
    }

    // calculate next execution from previously executed batches
    TimeTriggerExecution nextExecution =
        executionPlanner.constructNextExecution(timeTriggerExecution, executedTimeTriggers);

    // Calculate earliest next execution for invisibility duration
    Optional<Date> firstExecutionDate =
        executionPlanner.calculateEarliestExecutionDate(
            nextExecution.getTimeTriggersWithWatermarks(), workflowId);
    if (firstExecutionDate.isPresent()) {
      long duration = calculateMessageDelay(firstExecutionDate.get());
      LOG.info(
          "Pushing next execution with invisibility duration of [{}] seconds for [{}] with next execution date=[{}]",
          duration,
          nextExecution,
          firstExecutionDate);
      timeTriggerProducer.push(nextExecution, (int) duration);
    } else {
      // No next executions left, can terminate subscription
      metrics.counter(MetricConstants.TERMINATE_SUBSCRIPTION_METRIC, getClass());
      LOG.info(
          "timeTriggerExecution terminating for workflow [{}][{}] triggerUUID=[{}] due to empty next execution",
          timeTriggerExecution.getWorkflowId(),
          timeTriggerExecution.getWorkflowVersion(),
          timeTriggerExecution.getWorkflowTriggerUuid());
    }
    LOG.info(
        "Finished processing timeTriggerExecution [{}] and spent [{}] ms",
        timeTriggerExecution,
        System.currentTimeMillis() - start);
  }