protected IngestionResult ingestFromStreamImpl()

in ingest/src/main/java/com/microsoft/azure/kusto/ingest/ManagedStreamingIngestClient.java [391:461]


    protected IngestionResult ingestFromStreamImpl(StreamSourceInfo streamSourceInfo, IngestionProperties ingestionProperties)
            throws IngestionClientException, IngestionServiceException, IOException {
        Ensure.argIsNotNull(streamSourceInfo, "streamSourceInfo");
        Ensure.argIsNotNull(ingestionProperties, "ingestionProperties");

        streamSourceInfo.validate();
        ingestionProperties.validate();

        UUID sourceId = streamSourceInfo.getSourceId();
        if (sourceId == null) {
            sourceId = UUID.randomUUID();
        }

        streamSourceInfo.setSourceId(sourceId);
        byte[] streamingBytes;
        InputStream byteArrayStream;

        if (queuingPolicy.shouldUseQueuedIngestion(streamSourceInfo.getStream().available(),
                streamSourceInfo.getCompressionType() != null, ingestionProperties.getDataFormat())) {
            log.info(String.format(fallbackLogString, sourceId));
            return queuedIngestClient.ingestFromStream(streamSourceInfo, ingestionProperties);
        }

        try {
            if (streamSourceInfo.getStream() instanceof ByteArrayInputStream || streamSourceInfo.getStream() instanceof ResettableFileInputStream) {
                byteArrayStream = streamSourceInfo.getStream();
            } else {
                // If its not a ByteArrayInputStream:
                // Read 10mb (max streaming size), decide with that if we should stream
                streamingBytes = IngestionUtils.readBytesFromInputStream(streamSourceInfo.getStream(),
                        ManagedStreamingQueuingPolicy.MAX_STREAMING_STREAM_SIZE_BYTES + 1);
                byteArrayStream = new ByteArrayInputStream(streamingBytes);
                int size = streamingBytes.length;
                if (queuingPolicy.shouldUseQueuedIngestion(size,
                        streamSourceInfo.getCompressionType() != null, ingestionProperties.getDataFormat())) {
                    StreamSourceInfo managedSourceInfo = new StreamSourceInfo(new SequenceInputStream(byteArrayStream, streamSourceInfo.getStream()),
                            streamSourceInfo.isLeaveOpen(), sourceId, streamSourceInfo.getCompressionType());
                    log.info(String.format(fallbackLogString, managedSourceInfo.getSourceId()));

                    return queuedIngestClient.ingestFromStream(managedSourceInfo, ingestionProperties);
                }

                if (!streamSourceInfo.isLeaveOpen()) {
                    // From this point we don't need the original stream anymore, we cached it
                    try {
                        streamSourceInfo.getStream().close();
                    } catch (IOException e) {
                        log.warn("Failed to close stream", e);
                    }
                }
            }
        } catch (IOException e) {
            throw new IngestionClientException("Failed to read from stream.", e);
        }

        StreamSourceInfo managedSourceInfo = new StreamSourceInfo(byteArrayStream, true, sourceId, streamSourceInfo.getCompressionType());
        try {
            IngestionResult result = streamWithRetries(managedSourceInfo, ingestionProperties, null);
            if (result != null) {
                return result;
            }

            return queuedIngestClient.ingestFromStream(managedSourceInfo, ingestionProperties);
        } finally {
            try {
                managedSourceInfo.getStream().close();
            } catch (IOException e) {
                log.warn("Failed to close byte stream", e);
            }
        }
    }