in data/src/main/java/com/microsoft/azure/kusto/data/ClientImpl.java [276:333]
private Mono<KustoOperationResult> executeStreamingIngest(String clusterEndpoint, InputStream stream, String blobUrl,
ClientRequestProperties properties, boolean leaveOpen) {
boolean isStreamSource = stream != null;
Map<String, String> headers = new HashMap<>();
String contentEncoding = isStreamSource ? "gzip" : null;
String contentType = isStreamSource ? "application/octet-stream" : "application/json";
properties = properties == null ? new ClientRequestProperties() : properties;
long timeoutMs = determineTimeout(properties, CommandType.STREAMING_INGEST, clusterUrl);
// This was a separate method but was moved into the body of this method because it performs a side effect
Iterator<Map.Entry<String, Object>> iterator = properties.getOptions();
while (iterator.hasNext()) {
Map.Entry<String, Object> pair = iterator.next();
headers.put(pair.getKey(), pair.getValue().toString());
}
BinaryData data;
if (isStreamSource) {
// We use UncloseableStream to prevent HttpClient from closing the stream
data = BinaryData.fromStream(new UncloseableStream(stream));
} else {
data = BinaryData.fromString(new IngestionSourceStorage(blobUrl).toString());
}
HttpTracing tracing = HttpTracing
.newBuilder()
.withProperties(properties)
.withRequestPrefix("KJC.executeStreamingIngest" + (isStreamSource ? "" : "FromBlob"))
.withActivitySuffix(CommandType.STREAMING_INGEST.getActivityTypeSuffix())
.withClientDetails(clientDetails)
.build();
HttpRequestBuilder httpRequestBuilder = HttpRequestBuilder
.newPost(clusterEndpoint)
.withTracing(tracing)
.withHeaders(headers)
.withContentType(contentType)
.withContentEncoding(contentEncoding)
.withBody(data);
return getAuthorizationHeaderValueAsync()
.doOnNext(httpRequestBuilder::withAuthorization)
.then(MonitoredActivity.wrap(postAsync(httpRequestBuilder.build(), timeoutMs), "ClientImpl.executeStreamingIngest")
.publishOn(Schedulers.boundedElastic())
.map(response -> new KustoOperationResult(response, "v1"))
.onErrorMap(KustoServiceQueryError.class, e -> new DataClientException(clusterEndpoint, e.getMessage(), e))
.doFinally(signalType -> {
if (isStreamSource && !leaveOpen) {
try {
stream.close();
} catch (IOException e) {
LOGGER.debug("executeStreamingIngest: Error while closing the stream.", e);
}
}
}));
}