in ingest/src/main/java/com/microsoft/azure/kusto/ingest/QueuedIngestClientImpl.java [83:150]
protected IngestionResult ingestFromBlobImpl(BlobSourceInfo blobSourceInfo, IngestionProperties ingestionProperties)
throws IngestionClientException, IngestionServiceException {
// Argument validation:
Ensure.argIsNotNull(blobSourceInfo, "blobSourceInfo");
Ensure.argIsNotNull(ingestionProperties, "ingestionProperties");
blobSourceInfo.validate();
ingestionProperties.validate();
try {
ingestionProperties.setAuthorizationContextToken(resourceManager.getIdentityToken());
List<IngestionStatusInTableDescription> tableStatuses = new LinkedList<>();
// Create the ingestion message
IngestionBlobInfo ingestionBlobInfo = new IngestionBlobInfo(blobSourceInfo.getBlobPath(),
ingestionProperties.getDatabaseName(), ingestionProperties.getTableName(), this.applicationForTracing,
this.clientVersionForTracing);
String urlWithoutSecrets = SecurityUtils.removeSecretsFromUrl(blobSourceInfo.getBlobPath());
if (blobSourceInfo.getBlobExactSize() != null) {
ingestionBlobInfo.setRawDataSize(blobSourceInfo.getBlobExactSize());
} else {
log.warn("Blob '{}' was sent for ingestion without specifying its raw data size", urlWithoutSecrets);
}
ingestionBlobInfo.setReportLevel(ingestionProperties.getReportLevel().getKustoValue());
ingestionBlobInfo.setReportMethod(ingestionProperties.getReportMethod().getKustoValue());
ingestionBlobInfo.setFlushImmediately(ingestionProperties.getFlushImmediately());
ingestionBlobInfo.setValidationPolicy(ingestionProperties.getValidationPolicy());
ingestionBlobInfo.setAdditionalProperties(ingestionProperties.getIngestionProperties());
if (blobSourceInfo.getSourceId() != null) {
ingestionBlobInfo.setId(blobSourceInfo.getSourceId());
}
String id = ingestionBlobInfo.getId().toString();
IngestionStatus status = new IngestionStatus();
status.setDatabase(ingestionProperties.getDatabaseName());
status.setTable(ingestionProperties.getTableName());
status.setStatus(OperationStatus.Queued);
status.setUpdatedOn(Instant.now());
status.setIngestionSourceId(ingestionBlobInfo.getId());
status.setIngestionSourcePath(urlWithoutSecrets);
boolean reportToTable = ingestionProperties.getReportLevel() != IngestionProperties.IngestionReportLevel.NONE &&
ingestionProperties.getReportMethod() != IngestionProperties.IngestionReportMethod.QUEUE;
if (reportToTable) {
status.setStatus(OperationStatus.Pending);
TableWithSas statusTable = resourceManager
.getStatusTable();
IngestionStatusInTableDescription ingestionStatusInTable = new IngestionStatusInTableDescription();
ingestionStatusInTable.setTableClient(statusTable.getTable());
ingestionStatusInTable.setTableConnectionString(statusTable.getUri());
ingestionStatusInTable.setPartitionKey(ingestionBlobInfo.getId().toString());
ingestionStatusInTable.setRowKey(ingestionBlobInfo.getId().toString());
ingestionBlobInfo.setIngestionStatusInTable(ingestionStatusInTable);
azureStorageClient.azureTableInsertEntity(statusTable.getTable(), new TableEntity(id, id).setProperties(status.getEntityProperties()));
tableStatuses.add(ingestionBlobInfo.getIngestionStatusInTable());
}
ResourceAlgorithms.postToQueueWithRetries(resourceManager, azureStorageClient, ingestionBlobInfo);
return reportToTable
? new TableReportIngestionResult(tableStatuses)
: new IngestionStatusResult(status);
} catch (BlobStorageException | QueueStorageException | TableServiceException e) {
throw new IngestionServiceException("Failed to ingest from blob", e);
} catch (IOException | URISyntaxException e) {
throw new IngestionClientException("Failed to ingest from blob", e);
}
}