private CompletableFuture doSend()

in fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/client/ServerConnection.java [273:354]


    private CompletableFuture<ApiMessage> doSend(
            ApiKeys apiKey,
            ApiMessage rawRequest,
            CompletableFuture<ApiMessage> responseFuture,
            boolean isInternalRequest) {
        synchronized (lock) {
            if (state.isDisconnected()) {
                Exception exception =
                        new NetworkException(
                                new DisconnectException(
                                        "Cannot send request to server "
                                                + node
                                                + " because it is disconnected."));
                responseFuture.completeExceptionally(exception);
                return responseFuture;
            }
            // 1. connection is not established: all requests are queued
            // 2. connection is established but not ready: internal requests are processed, other
            // requests are queued
            if (!state.isEstablished() || (!state.isReady() && !isInternalRequest)) {
                pendingRequests.add(
                        new PendingRequest(apiKey, rawRequest, isInternalRequest, responseFuture));
                return responseFuture;
            }

            // version equals highestSupportedVersion might happen when requesting api version check
            // before serverApiVersions is  initialized. We always use the highest version for api
            // version checking.
            short version = apiKey.highestSupportedVersion;
            if (serverApiVersions != null) {
                try {
                    version = serverApiVersions.highestAvailableVersion(apiKey);
                } catch (Exception e) {
                    responseFuture.completeExceptionally(e);
                }
            }

            InflightRequest inflight =
                    new InflightRequest(
                            apiKey.id, version, requestCount++, rawRequest, responseFuture);
            inflightRequests.put(inflight.requestId, inflight);

            // TODO: maybe we need to add timeout for the inflight requests
            ByteBuf byteBuf;
            try {
                byteBuf = inflight.toByteBuf(channel.alloc());
            } catch (Exception e) {
                LOG.error("Failed to encode request for '{}'.", ApiKeys.forId(inflight.apiKey), e);
                inflightRequests.remove(inflight.requestId);
                responseFuture.completeExceptionally(
                        new FlussRuntimeException(
                                String.format(
                                        "Failed to encode request for '%s'",
                                        ApiKeys.forId(inflight.apiKey)),
                                e));
                return responseFuture;
            }

            connectionMetricGroup.updateMetricsBeforeSendRequest(apiKey, rawRequest.totalSize());

            channel.writeAndFlush(byteBuf)
                    .addListener(
                            (ChannelFutureListener)
                                    future -> {
                                        if (!future.isSuccess()) {
                                            connectionMetricGroup.updateMetricsAfterGetResponse(
                                                    apiKey, inflight.requestStartTime, 0);
                                            Throwable cause = future.cause();
                                            if (cause instanceof IOException) {
                                                // when server close the channel, the cause will be
                                                // IOException, if the cause is IOException, we wrap
                                                // it as retryable NetworkException to retry to
                                                // connect
                                                cause = new NetworkException(cause);
                                            }
                                            inflight.responseFuture.completeExceptionally(cause);
                                            inflightRequests.remove(inflight.requestId);
                                        }
                                    });
            return inflight.responseFuture;
        }
    }