protected IngestionResult ingestFromStreamImpl()

in ingest/src/main/java/com/microsoft/azure/kusto/ingest/QueuedIngestClientImpl.java [188:236]


    protected IngestionResult ingestFromStreamImpl(StreamSourceInfo streamSourceInfo, IngestionProperties ingestionProperties)
            throws IngestionClientException, IngestionServiceException {
        // Argument validation:
        Ensure.argIsNotNull(streamSourceInfo, "streamSourceInfo");
        Ensure.argIsNotNull(ingestionProperties, "ingestionProperties");

        streamSourceInfo.validate();
        ingestionProperties.validate();

        try {
            IngestionResult ingestionResult;
            if (streamSourceInfo.getStream() == null) {
                throw new IngestionClientException("The provided stream is null.");
            } else if (streamSourceInfo.getStream().available() <= 0) {
                throw new IngestionClientException("The provided stream is empty.");
            }
            IngestionProperties.DataFormat dataFormat = ingestionProperties.getDataFormat();
            boolean shouldCompress = shouldCompress(streamSourceInfo.getCompressionType(), dataFormat);

            String blobName = genBlobName(
                    "StreamUpload",
                    ingestionProperties.getDatabaseName(),
                    ingestionProperties.getTableName(),
                    dataFormat.getKustoValue(), // Used to use an empty string if the DataFormat was empty. Now it can't be empty, with a default of CSV.
                    shouldCompress ? CompressionType.gz : streamSourceInfo.getCompressionType());

            ResourceAlgorithms.UploadResult blobUploadedDetails = ResourceAlgorithms.uploadStreamToBlobWithRetries(resourceManager,
                    azureStorageClient,
                    streamSourceInfo.getStream(),
                    blobName,
                    shouldCompress);
            if (blobUploadedDetails.size == 0) {
                String message = "Empty stream.";
                log.error(message);
                throw new IngestionClientException(message);
            }
            BlobSourceInfo blobSourceInfo = BlobSourceInfo.fromStream(blobUploadedDetails.blobPath, blobUploadedDetails.size, streamSourceInfo);

            ingestionResult = ingestFromBlob(blobSourceInfo, ingestionProperties);
            if (!streamSourceInfo.isLeaveOpen()) {
                streamSourceInfo.getStream().close();
            }
            return ingestionResult;
        } catch (BlobStorageException e) {
            throw new IngestionServiceException("Failed to ingest from stream", e);
        } catch (IOException e) {
            throw new IngestionClientException("Failed to ingest from stream", e);
        }
    }