in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/worker/dispatcher/grpc/GrpcDispatcher.java [114:186]
public CompletionStage<GrpcResponse> submit(ItemAndJob<GrpcRequest> item) {
CompletableFuture<GrpcResponse> completableFuture =
infra.contextManager().wrap(new CompletableFuture<>());
if (item == null) {
completableFuture.completeExceptionally(new NullPointerException());
return completableFuture;
}
final GrpcRequest message = item.getItem();
MessageStub.Attempt attempt = message.getStub().newAttempt();
if (attempt.isCanceled()) {
// already canceled
completableFuture.complete(GrpcResponse.of(Status.CANCELLED));
return attempt.complete(completableFuture);
}
final Job job = item.getJob();
MetricsUtils.jobScope(infra.scope(), job, calleeAddress)
.gauge(MetricNames.CHANNEL_USAGE)
.update(channel.getMetrics().usage());
MetricsUtils.jobScope(infra.scope(), job, calleeAddress)
.gauge(MetricNames.CHANNEL_SIZE)
.update(channel.getMetrics().size());
String[] tags = extractDispatchMetricsTags(item);
Context.CancellableContext context = Context.current().withCancellation();
attempt.onCancel(() -> context.cancel(null));
return attempt.complete(
Instrumentation.instrument
.withExceptionalCompletion(
LOGGER,
infra.scope(),
wrapInContext(
context,
() -> {
Optional<Status> status = grpcFilter.tryHandleRequest(message, job);
if (status.isPresent()) {
return CompletableFuture.completedFuture(GrpcResponse.of(status.get()));
}
long timeoutMs =
GrpcUtils.getTimeout(
job, configuration, message.getTimeoutCount(), infra.scope());
Instrumentation.instrument.withStreamObserver(
LOGGER,
infra.scope(),
ClientCalls::asyncUnaryCall,
channelWithMetadata(message, tags)
.newCall(methodDescriptor, extractCallOptions(item)),
message.payload(),
newStreamObserver(completableFuture, timeoutMs, message, tags),
MetricNames.CALL,
tags);
return completableFuture;
}),
MetricNames.DISPATCH,
tags)
.whenComplete(
(r, t) -> {
// adding additional metric to include all statuses including filtered ones
Map<String, String> map = new HashMap<>();
Utils.copyTags(map, tags);
map.put(Tags.Key.code, r.status().getCode().name());
infra.scope().tagged(map).counter(MetricNames.CALL_WITH_FILTER).inc(1);
// close context to avoid memory leak
context.close();
})
.thenApply(
response -> {
message.getFuture().complete(response);
return response;
}));
}