in ingest/src/main/java/com/microsoft/azure/kusto/ingest/ManagedStreamingIngestClient.java [292:329]
protected IngestionResult ingestFromBlobImpl(BlobSourceInfo blobSourceInfo, IngestionProperties ingestionProperties)
throws IngestionClientException, IngestionServiceException {
Ensure.argIsNotNull(blobSourceInfo, "blobSourceInfo");
Ensure.argIsNotNull(ingestionProperties, "ingestionProperties");
blobSourceInfo.validate();
ingestionProperties.validate();
BlobClientBuilder blobClientBuilder = new BlobClientBuilder().endpoint(blobSourceInfo.getBlobPath());
if (httpClient != null) {
blobClientBuilder.httpClient(httpClient);
}
BlobClient blobClient = blobClientBuilder.buildClient();
long blobSize = 0;
if (blobSourceInfo.getBlobExactSize() != null) {
try {
blobSize = blobClient.getProperties().getBlobSize();
} catch (BlobStorageException e) {
throw new IngestionServiceException(
blobSourceInfo.getBlobPath(),
"Failed getting blob properties: " + ExceptionUtils.getMessageEx(e),
e);
}
}
if (queuingPolicy.shouldUseQueuedIngestion(blobSize,
blobSourceInfo.getCompressionType() != null, ingestionProperties.getDataFormat())) {
log.info(String.format(fallbackLogString, blobSourceInfo.getSourceId()));
return queuedIngestClient.ingestFromBlob(blobSourceInfo, ingestionProperties);
}
IngestionResult result = streamWithRetries(blobSourceInfo, ingestionProperties, blobClient);
if (result != null) {
return result;
}
return queuedIngestClient.ingestFromBlob(blobSourceInfo, ingestionProperties);
}