in ingest/src/main/java/com/microsoft/azure/kusto/ingest/StreamingIngestClient.java [165:208]
private IngestionResult ingestFromStreamImpl(StreamSourceInfo streamSourceInfo, IngestionProperties ingestionProperties, @Nullable String clientRequestId)
throws IngestionClientException, IngestionServiceException {
Ensure.argIsNotNull(streamSourceInfo, "streamSourceInfo");
Ensure.argIsNotNull(ingestionProperties, "ingestionProperties");
IngestionProperties.DataFormat dataFormat = ingestionProperties.getDataFormat();
streamSourceInfo.validate();
ingestionProperties.validate();
ClientRequestProperties clientRequestProperties = null;
if (StringUtils.isNotBlank(clientRequestId)) {
clientRequestProperties = new ClientRequestProperties();
clientRequestProperties.setClientRequestId(clientRequestId);
}
try {
InputStream stream = IngestClientBase.shouldCompress(streamSourceInfo.getCompressionType(), dataFormat)
? compressStream(streamSourceInfo.getStream(), streamSourceInfo.isLeaveOpen())
: streamSourceInfo.getStream();
log.debug("Executing streaming ingest");
this.streamingClient.executeStreamingIngest(ingestionProperties.getDatabaseName(),
ingestionProperties.getTableName(),
stream,
clientRequestProperties,
dataFormat.getKustoValue(),
ingestionProperties.getIngestionMapping().getIngestionMappingReference(),
!(streamSourceInfo.getCompressionType() == null || !streamSourceInfo.isLeaveOpen()));
} catch (DataClientException | IOException e) {
String msg = ExceptionUtils.getMessageEx(e);
log.error(msg, e);
throw new IngestionClientException(msg, e);
} catch (DataServiceException e) {
log.error(e.getMessage(), e);
throw new IngestionServiceException(e.getMessage(), e);
}
log.debug("Stream was ingested successfully.");
IngestionStatus ingestionStatus = new IngestionStatus();
ingestionStatus.status = OperationStatus.Succeeded;
ingestionStatus.table = ingestionProperties.getTableName();
ingestionStatus.database = ingestionProperties.getDatabaseName();
return new IngestionStatusResult(ingestionStatus);
}