in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/worker/dispatcher/grpc/GrpcDispatcher.java [112:170]
public CompletionStage<GrpcResponse> submit(ItemAndJob<GrpcRequest> item) {
CompletableFuture<GrpcResponse> completableFuture =
infra.contextManager().wrap(new CompletableFuture<>());
if (item == null) {
completableFuture.completeExceptionally(new NullPointerException());
return completableFuture;
}
final GrpcRequest message = item.getItem();
MessageStub.Attempt attempt = message.getStub().newAttempt();
if (attempt.isCanceled()) {
// already canceled
completableFuture.complete(GrpcResponse.of(Status.CANCELLED));
return attempt.complete(completableFuture);
}
final Job job = item.getJob();
MetricsUtils.jobScope(infra.scope(), job, calleeAddress)
.gauge(MetricNames.CHANNEL_USAGE)
.update(channel.getMetrics().usage());
String[] tags = extractDispatchMetricsTags(item);
return attempt.complete(
Instrumentation.instrument
.withExceptionalCompletion(
LOGGER,
infra.scope(),
() -> {
Optional<Status> status = grpcFilter.tryHandleRequest(message, job);
if (status.isPresent()) {
return CompletableFuture.completedFuture(GrpcResponse.of(status.get()));
}
long timeoutMs =
GrpcUtils.getTimeout(
job, configuration, message.getTimeoutCount(), infra.scope());
ClientCall clientCall =
channelWithMetadata(message)
.newCall(methodDescriptor, extractCallOptions(item));
attempt.onCancel(() -> clientCall.cancel(ERROR_MESSAGE_CANCEL, null));
Instrumentation.instrument.withStreamObserver(
LOGGER,
infra.scope(),
ClientCalls::asyncUnaryCall,
clientCall,
message.payload(),
newStreamObserver(completableFuture, timeoutMs, message, tags),
MetricNames.CALL,
tags);
return completableFuture;
},
MetricNames.DISPATCH,
tags)
.thenApply(
response -> {
message.getFuture().complete(response);
return response;
}));
}