private Mono executeStreamingIngest()

in data/src/main/java/com/microsoft/azure/kusto/data/ClientImpl.java [276:333]


    private Mono<KustoOperationResult> executeStreamingIngest(String clusterEndpoint, InputStream stream, String blobUrl,
            ClientRequestProperties properties, boolean leaveOpen) {
        boolean isStreamSource = stream != null;
        Map<String, String> headers = new HashMap<>();
        String contentEncoding = isStreamSource ? "gzip" : null;
        String contentType = isStreamSource ? "application/octet-stream" : "application/json";

        properties = properties == null ? new ClientRequestProperties() : properties;

        long timeoutMs = determineTimeout(properties, CommandType.STREAMING_INGEST, clusterUrl);

        // This was a separate method but was moved into the body of this method because it performs a side effect
        Iterator<Map.Entry<String, Object>> iterator = properties.getOptions();
        while (iterator.hasNext()) {
            Map.Entry<String, Object> pair = iterator.next();
            headers.put(pair.getKey(), pair.getValue().toString());
        }

        BinaryData data;
        if (isStreamSource) {
            // We use UncloseableStream to prevent HttpClient from closing the stream
            data = BinaryData.fromStream(new UncloseableStream(stream));
        } else {
            data = BinaryData.fromString(new IngestionSourceStorage(blobUrl).toString());
        }

        HttpTracing tracing = HttpTracing
                .newBuilder()
                .withProperties(properties)
                .withRequestPrefix("KJC.executeStreamingIngest" + (isStreamSource ? "" : "FromBlob"))
                .withActivitySuffix(CommandType.STREAMING_INGEST.getActivityTypeSuffix())
                .withClientDetails(clientDetails)
                .build();

        HttpRequestBuilder httpRequestBuilder = HttpRequestBuilder
                .newPost(clusterEndpoint)
                .withTracing(tracing)
                .withHeaders(headers)
                .withContentType(contentType)
                .withContentEncoding(contentEncoding)
                .withBody(data);

        return getAuthorizationHeaderValueAsync()
                .doOnNext(httpRequestBuilder::withAuthorization)
                .then(MonitoredActivity.wrap(postAsync(httpRequestBuilder.build(), timeoutMs), "ClientImpl.executeStreamingIngest")
                        .publishOn(Schedulers.boundedElastic())
                        .map(response -> new KustoOperationResult(response, "v1"))
                        .onErrorMap(KustoServiceQueryError.class, e -> new DataClientException(clusterEndpoint, e.getMessage(), e))
                        .doFinally(signalType -> {
                            if (isStreamSource && !leaveOpen) {
                                try {
                                    stream.close();
                                } catch (IOException e) {
                                    LOGGER.debug("executeStreamingIngest: Error while closing the stream.", e);
                                }
                            }
                        }));
    }