in src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java [117:170]
public void handle(PollForActivityTaskResponse task) throws Exception {
Scope metricsScope =
options
.getMetricsScope()
.tagged(
ImmutableMap.of(
MetricsTag.ACTIVITY_TYPE,
task.getActivityType().getName(),
MetricsTag.WORKFLOW_TYPE,
task.getWorkflowType().getName()));
metricsScope
.timer(MetricsType.ACTIVITY_SCHEDULED_TO_START_LATENCY)
.record(
Duration.ofNanos(
task.getStartedTimestamp() - task.getScheduledTimestampOfThisAttempt()));
// The following tags are for logging.
MDC.put(LoggerTag.ACTIVITY_ID, task.getActivityId());
MDC.put(LoggerTag.ACTIVITY_TYPE, task.getActivityType().getName());
MDC.put(LoggerTag.WORKFLOW_ID, task.getWorkflowExecution().getWorkflowId());
MDC.put(LoggerTag.RUN_ID, task.getWorkflowExecution().getRunId());
propagateContext(task);
try {
Stopwatch sw = metricsScope.timer(MetricsType.ACTIVITY_EXEC_LATENCY).start();
ActivityTaskHandler.Result response = handler.handle(task, metricsScope, false);
sw.stop();
sw = metricsScope.timer(MetricsType.ACTIVITY_RESP_LATENCY).start();
sendReply(task, response, metricsScope);
sw.stop();
long nanoTime =
TimeUnit.NANOSECONDS.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
Duration duration = Duration.ofNanos(nanoTime - task.getScheduledTimestampOfThisAttempt());
metricsScope.timer(MetricsType.ACTIVITY_E2E_LATENCY).record(duration);
} catch (CancellationException e) {
RespondActivityTaskCanceledRequest cancelledRequest =
new RespondActivityTaskCanceledRequest();
cancelledRequest.setDetails(
String.valueOf(e.getMessage()).getBytes(StandardCharsets.UTF_8));
Stopwatch sw = metricsScope.timer(MetricsType.ACTIVITY_RESP_LATENCY).start();
sendReply(task, new Result(null, null, cancelledRequest), metricsScope);
sw.stop();
} finally {
MDC.remove(LoggerTag.ACTIVITY_ID);
MDC.remove(LoggerTag.ACTIVITY_TYPE);
MDC.remove(LoggerTag.WORKFLOW_ID);
MDC.remove(LoggerTag.RUN_ID);
}
}