in fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/client/ServerConnection.java [273:354]
private CompletableFuture<ApiMessage> doSend(
ApiKeys apiKey,
ApiMessage rawRequest,
CompletableFuture<ApiMessage> responseFuture,
boolean isInternalRequest) {
synchronized (lock) {
if (state.isDisconnected()) {
Exception exception =
new NetworkException(
new DisconnectException(
"Cannot send request to server "
+ node
+ " because it is disconnected."));
responseFuture.completeExceptionally(exception);
return responseFuture;
}
// 1. connection is not established: all requests are queued
// 2. connection is established but not ready: internal requests are processed, other
// requests are queued
if (!state.isEstablished() || (!state.isReady() && !isInternalRequest)) {
pendingRequests.add(
new PendingRequest(apiKey, rawRequest, isInternalRequest, responseFuture));
return responseFuture;
}
// version equals highestSupportedVersion might happen when requesting api version check
// before serverApiVersions is initialized. We always use the highest version for api
// version checking.
short version = apiKey.highestSupportedVersion;
if (serverApiVersions != null) {
try {
version = serverApiVersions.highestAvailableVersion(apiKey);
} catch (Exception e) {
responseFuture.completeExceptionally(e);
}
}
InflightRequest inflight =
new InflightRequest(
apiKey.id, version, requestCount++, rawRequest, responseFuture);
inflightRequests.put(inflight.requestId, inflight);
// TODO: maybe we need to add timeout for the inflight requests
ByteBuf byteBuf;
try {
byteBuf = inflight.toByteBuf(channel.alloc());
} catch (Exception e) {
LOG.error("Failed to encode request for '{}'.", ApiKeys.forId(inflight.apiKey), e);
inflightRequests.remove(inflight.requestId);
responseFuture.completeExceptionally(
new FlussRuntimeException(
String.format(
"Failed to encode request for '%s'",
ApiKeys.forId(inflight.apiKey)),
e));
return responseFuture;
}
connectionMetricGroup.updateMetricsBeforeSendRequest(apiKey, rawRequest.totalSize());
channel.writeAndFlush(byteBuf)
.addListener(
(ChannelFutureListener)
future -> {
if (!future.isSuccess()) {
connectionMetricGroup.updateMetricsAfterGetResponse(
apiKey, inflight.requestStartTime, 0);
Throwable cause = future.cause();
if (cause instanceof IOException) {
// when server close the channel, the cause will be
// IOException, if the cause is IOException, we wrap
// it as retryable NetworkException to retry to
// connect
cause = new NetworkException(cause);
}
inflight.responseFuture.completeExceptionally(cause);
inflightRequests.remove(inflight.requestId);
}
});
return inflight.responseFuture;
}
}