in ingest/src/main/java/com/microsoft/azure/kusto/ingest/QueuedIngestClientImpl.java [188:236]
protected IngestionResult ingestFromStreamImpl(StreamSourceInfo streamSourceInfo, IngestionProperties ingestionProperties)
throws IngestionClientException, IngestionServiceException {
// Argument validation:
Ensure.argIsNotNull(streamSourceInfo, "streamSourceInfo");
Ensure.argIsNotNull(ingestionProperties, "ingestionProperties");
streamSourceInfo.validate();
ingestionProperties.validate();
try {
IngestionResult ingestionResult;
if (streamSourceInfo.getStream() == null) {
throw new IngestionClientException("The provided stream is null.");
} else if (streamSourceInfo.getStream().available() <= 0) {
throw new IngestionClientException("The provided stream is empty.");
}
IngestionProperties.DataFormat dataFormat = ingestionProperties.getDataFormat();
boolean shouldCompress = shouldCompress(streamSourceInfo.getCompressionType(), dataFormat);
String blobName = genBlobName(
"StreamUpload",
ingestionProperties.getDatabaseName(),
ingestionProperties.getTableName(),
dataFormat.getKustoValue(), // Used to use an empty string if the DataFormat was empty. Now it can't be empty, with a default of CSV.
shouldCompress ? CompressionType.gz : streamSourceInfo.getCompressionType());
ResourceAlgorithms.UploadResult blobUploadedDetails = ResourceAlgorithms.uploadStreamToBlobWithRetries(resourceManager,
azureStorageClient,
streamSourceInfo.getStream(),
blobName,
shouldCompress);
if (blobUploadedDetails.size == 0) {
String message = "Empty stream.";
log.error(message);
throw new IngestionClientException(message);
}
BlobSourceInfo blobSourceInfo = BlobSourceInfo.fromStream(blobUploadedDetails.blobPath, blobUploadedDetails.size, streamSourceInfo);
ingestionResult = ingestFromBlob(blobSourceInfo, ingestionProperties);
if (!streamSourceInfo.isLeaveOpen()) {
streamSourceInfo.getStream().close();
}
return ingestionResult;
} catch (BlobStorageException e) {
throw new IngestionServiceException("Failed to ingest from stream", e);
} catch (IOException e) {
throw new IngestionClientException("Failed to ingest from stream", e);
}
}