public CompletableFuture stream()

in vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxHttpClient.java [181:225]


    public CompletableFuture<HttpResponse> stream(SidecarInstance sidecarInstance,
                                                  RequestContext context,
                                                  StreamConsumer streamConsumer)
    {
        Objects.requireNonNull(streamConsumer, "The streamConsumer must be set");
        HttpRequest<Buffer> vertxRequest = vertxRequest(sidecarInstance, context);

        LOGGER.debug("Streaming request={}, from instance={}", context.request(), sidecarInstance);

        Promise<HttpResponse> promise = Promise.promise();
        vertxRequest.ssl(config.ssl())
                    .timeout(config.timeoutMillis())
                    .expect(response -> {

                        // fulfill the promise with the response
                        promise.complete(new HttpResponseImpl(response.statusCode(),
                                                              response.statusMessage(),
                                                              mapHeaders(response.headers()),
                                                              sidecarInstance));

                        if (response.statusCode() == HttpResponseStatus.OK.code() ||
                            response.statusCode() == HttpResponseStatus.PARTIAL_CONTENT.code())
                        {
                            return ResponsePredicateResult.success();
                        }
                        else
                        {
                            LOGGER.warn("Unexpected status code received statusCode={}, statusMessage={}",
                                        response.statusCode(), response.statusMessage());
                            return ResponsePredicateResult.failure("Unexpected status code: " +
                                                                   response.statusCode());
                        }
                    })
                    .as(BodyCodec.pipe(new StreamConsumerWriteStream(streamConsumer)))
                    .send()
                    .onFailure(throwable -> {
                        if (!promise.tryFail(throwable))
                        {
                            // the stream has already started, we need to signal the consumer that the
                            // there was a failure mid-stream. This is a non-retryable case
                            streamConsumer.onError(throwable);
                        }
                    });
        return promise.future().toCompletionStage().toCompletableFuture();
    }