in vertx-client/src/main/java/org/apache/cassandra/sidecar/client/VertxHttpClient.java [181:225]
public CompletableFuture<HttpResponse> stream(SidecarInstance sidecarInstance,
RequestContext context,
StreamConsumer streamConsumer)
{
Objects.requireNonNull(streamConsumer, "The streamConsumer must be set");
HttpRequest<Buffer> vertxRequest = vertxRequest(sidecarInstance, context);
LOGGER.debug("Streaming request={}, from instance={}", context.request(), sidecarInstance);
Promise<HttpResponse> promise = Promise.promise();
vertxRequest.ssl(config.ssl())
.timeout(config.timeoutMillis())
.expect(response -> {
// fulfill the promise with the response
promise.complete(new HttpResponseImpl(response.statusCode(),
response.statusMessage(),
mapHeaders(response.headers()),
sidecarInstance));
if (response.statusCode() == HttpResponseStatus.OK.code() ||
response.statusCode() == HttpResponseStatus.PARTIAL_CONTENT.code())
{
return ResponsePredicateResult.success();
}
else
{
LOGGER.warn("Unexpected status code received statusCode={}, statusMessage={}",
response.statusCode(), response.statusMessage());
return ResponsePredicateResult.failure("Unexpected status code: " +
response.statusCode());
}
})
.as(BodyCodec.pipe(new StreamConsumerWriteStream(streamConsumer)))
.send()
.onFailure(throwable -> {
if (!promise.tryFail(throwable))
{
// the stream has already started, we need to signal the consumer that the
// there was a failure mid-stream. This is a non-retryable case
streamConsumer.onError(throwable);
}
});
return promise.future().toCompletionStage().toCompletableFuture();
}