in maestro-aws/src/main/java/com/netflix/maestro/engine/processors/SqsProcessorFinalizer.java [61:132]
public <T extends MaestroJobEvent> void process(
String payload,
Runnable acknowledgement,
Consumer<Integer> setVisibility,
int visibilityTimeoutInSecs,
int receiveCount,
MaestroEventProcessor<T> processor,
Class<T> clazz) {
try {
long start = metrics.clock().monotonicTime();
processor.process(
() -> {
try {
T message = objectMapper.readValue(payload, clazz);
LOG.debug(
"Received a [{}] from SQS and processing it: [{}]",
clazz.getSimpleName(),
message);
return message;
} catch (IOException ex) {
throw new MaestroInternalError(ex, "exception during json parsing");
}
});
long latencyInMillis = TimeUnit.NANOSECONDS.toMillis(metrics.clock().monotonicTime() - start);
metrics.timer(
AwsMetricConstants.SQS_PROCESSOR_LATENCY_METRIC,
latencyInMillis,
getClass(),
AwsMetricConstants.JOB_TYPE_TAG,
clazz.getSimpleName());
acknowledgement.run();
metrics.counter(
AwsMetricConstants.SQS_JOB_EVENT_LISTENER_SUCCESS_METRIC,
getClass(),
AwsMetricConstants.JOB_TYPE_TAG,
clazz.getSimpleName());
} catch (Exception ex) {
if (exceptionEventDeletionPolicy.getPolicy().apply(ex)) {
metrics.counter(
AwsMetricConstants.SQS_JOB_EVENT_LISTENER_FAILURE_METRIC,
getClass(),
AwsMetricConstants.JOB_TYPE_TAG,
clazz.getSimpleName(),
MetricConstants.TYPE_TAG,
ex.getClass().getSimpleName());
LOG.warn(
"Deleting exception from queue. Exception [{}] when processing payload [{}]",
ex.getClass().getSimpleName(),
payload,
ex);
acknowledgement.run();
} else {
if (receiveCount >= RECEIVE_COUNT_THRESHOLD_FOR_ADDITIONAL_MONITORING) {
LOG.warn(
"SQS payload: [{}] has been retrying [{}] times, check if there are any problems.",
payload,
receiveCount);
metrics.counter(
AwsMetricConstants.SQS_EVENT_HIGH_NUMBER_OF_RETRIES,
getClass(),
AwsMetricConstants.JOB_TYPE_TAG,
clazz.getSimpleName());
}
LOG.warn(
"Exception [{}] when processing class type [{}], to be retried later",
ex.getClass().getSimpleName(),
clazz.getSimpleName(),
ex);
setVisibility.accept(visibilityTimeoutInSecs);
}
}
}