public void process()

in maestro-aws/src/main/java/com/netflix/maestro/engine/processors/SqsProcessorFinalizer.java [61:132]


  public <T extends MaestroJobEvent> void process(
      String payload,
      Runnable acknowledgement,
      Consumer<Integer> setVisibility,
      int visibilityTimeoutInSecs,
      int receiveCount,
      MaestroEventProcessor<T> processor,
      Class<T> clazz) {
    try {
      long start = metrics.clock().monotonicTime();
      processor.process(
          () -> {
            try {
              T message = objectMapper.readValue(payload, clazz);
              LOG.debug(
                  "Received a [{}] from SQS and processing it: [{}]",
                  clazz.getSimpleName(),
                  message);
              return message;
            } catch (IOException ex) {
              throw new MaestroInternalError(ex, "exception during json parsing");
            }
          });
      long latencyInMillis = TimeUnit.NANOSECONDS.toMillis(metrics.clock().monotonicTime() - start);
      metrics.timer(
          AwsMetricConstants.SQS_PROCESSOR_LATENCY_METRIC,
          latencyInMillis,
          getClass(),
          AwsMetricConstants.JOB_TYPE_TAG,
          clazz.getSimpleName());
      acknowledgement.run();
      metrics.counter(
          AwsMetricConstants.SQS_JOB_EVENT_LISTENER_SUCCESS_METRIC,
          getClass(),
          AwsMetricConstants.JOB_TYPE_TAG,
          clazz.getSimpleName());
    } catch (Exception ex) {
      if (exceptionEventDeletionPolicy.getPolicy().apply(ex)) {
        metrics.counter(
            AwsMetricConstants.SQS_JOB_EVENT_LISTENER_FAILURE_METRIC,
            getClass(),
            AwsMetricConstants.JOB_TYPE_TAG,
            clazz.getSimpleName(),
            MetricConstants.TYPE_TAG,
            ex.getClass().getSimpleName());
        LOG.warn(
            "Deleting exception from queue. Exception [{}] when processing payload [{}]",
            ex.getClass().getSimpleName(),
            payload,
            ex);
        acknowledgement.run();
      } else {
        if (receiveCount >= RECEIVE_COUNT_THRESHOLD_FOR_ADDITIONAL_MONITORING) {
          LOG.warn(
              "SQS payload: [{}] has been retrying [{}] times, check if there are any problems.",
              payload,
              receiveCount);
          metrics.counter(
              AwsMetricConstants.SQS_EVENT_HIGH_NUMBER_OF_RETRIES,
              getClass(),
              AwsMetricConstants.JOB_TYPE_TAG,
              clazz.getSimpleName());
        }
        LOG.warn(
            "Exception [{}] when processing class type [{}], to be retried later",
            ex.getClass().getSimpleName(),
            clazz.getSimpleName(),
            ex);
        setVisibility.accept(visibilityTimeoutInSecs);
      }
    }
  }