in ingest/src/main/java/com/microsoft/azure/kusto/ingest/ManagedStreamingIngestClient.java [391:461]
protected IngestionResult ingestFromStreamImpl(StreamSourceInfo streamSourceInfo, IngestionProperties ingestionProperties)
throws IngestionClientException, IngestionServiceException, IOException {
Ensure.argIsNotNull(streamSourceInfo, "streamSourceInfo");
Ensure.argIsNotNull(ingestionProperties, "ingestionProperties");
streamSourceInfo.validate();
ingestionProperties.validate();
UUID sourceId = streamSourceInfo.getSourceId();
if (sourceId == null) {
sourceId = UUID.randomUUID();
}
streamSourceInfo.setSourceId(sourceId);
byte[] streamingBytes;
InputStream byteArrayStream;
if (queuingPolicy.shouldUseQueuedIngestion(streamSourceInfo.getStream().available(),
streamSourceInfo.getCompressionType() != null, ingestionProperties.getDataFormat())) {
log.info(String.format(fallbackLogString, sourceId));
return queuedIngestClient.ingestFromStream(streamSourceInfo, ingestionProperties);
}
try {
if (streamSourceInfo.getStream() instanceof ByteArrayInputStream || streamSourceInfo.getStream() instanceof ResettableFileInputStream) {
byteArrayStream = streamSourceInfo.getStream();
} else {
// If its not a ByteArrayInputStream:
// Read 10mb (max streaming size), decide with that if we should stream
streamingBytes = IngestionUtils.readBytesFromInputStream(streamSourceInfo.getStream(),
ManagedStreamingQueuingPolicy.MAX_STREAMING_STREAM_SIZE_BYTES + 1);
byteArrayStream = new ByteArrayInputStream(streamingBytes);
int size = streamingBytes.length;
if (queuingPolicy.shouldUseQueuedIngestion(size,
streamSourceInfo.getCompressionType() != null, ingestionProperties.getDataFormat())) {
StreamSourceInfo managedSourceInfo = new StreamSourceInfo(new SequenceInputStream(byteArrayStream, streamSourceInfo.getStream()),
streamSourceInfo.isLeaveOpen(), sourceId, streamSourceInfo.getCompressionType());
log.info(String.format(fallbackLogString, managedSourceInfo.getSourceId()));
return queuedIngestClient.ingestFromStream(managedSourceInfo, ingestionProperties);
}
if (!streamSourceInfo.isLeaveOpen()) {
// From this point we don't need the original stream anymore, we cached it
try {
streamSourceInfo.getStream().close();
} catch (IOException e) {
log.warn("Failed to close stream", e);
}
}
}
} catch (IOException e) {
throw new IngestionClientException("Failed to read from stream.", e);
}
StreamSourceInfo managedSourceInfo = new StreamSourceInfo(byteArrayStream, true, sourceId, streamSourceInfo.getCompressionType());
try {
IngestionResult result = streamWithRetries(managedSourceInfo, ingestionProperties, null);
if (result != null) {
return result;
}
return queuedIngestClient.ingestFromStream(managedSourceInfo, ingestionProperties);
} finally {
try {
managedSourceInfo.getStream().close();
} catch (IOException e) {
log.warn("Failed to close byte stream", e);
}
}
}