in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/worker/dispatcher/grpc/DedupHeaderInterceptor.java [38:78]
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
next.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
for (String key : headers.keys()) {
Key metadataKey;
if (key.endsWith(Metadata.BINARY_HEADER_SUFFIX)) {
metadataKey = Key.of(key, Metadata.BINARY_BYTE_MARSHALLER);
} else {
metadataKey = Key.of(key, Metadata.ASCII_STRING_MARSHALLER);
}
int counter = 0;
Iterable<Object> values = headers.getAll(metadataKey);
if (values == null) {
continue;
}
for (Object value : values) {
if (counter > 0) {
scope
.tagged(
StructuredTags.builder()
.setKafkaTopic(grpcRequest.getTopic())
.setKafkaGroup(grpcRequest.getConsumergroup())
.setHeader(key)
.build())
.counter("dispatcher.duplicate.header.removed")
.inc(1);
headers.remove(metadataKey, value);
}
counter++;
}
}
super.start(
new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(
responseListener) {},
headers);
}
};
}