private IngestionResult ingestFromStreamImpl()

in ingest/src/main/java/com/microsoft/azure/kusto/ingest/StreamingIngestClient.java [165:208]


    private IngestionResult ingestFromStreamImpl(StreamSourceInfo streamSourceInfo, IngestionProperties ingestionProperties, @Nullable String clientRequestId)
            throws IngestionClientException, IngestionServiceException {
        Ensure.argIsNotNull(streamSourceInfo, "streamSourceInfo");
        Ensure.argIsNotNull(ingestionProperties, "ingestionProperties");

        IngestionProperties.DataFormat dataFormat = ingestionProperties.getDataFormat();

        streamSourceInfo.validate();
        ingestionProperties.validate();

        ClientRequestProperties clientRequestProperties = null;
        if (StringUtils.isNotBlank(clientRequestId)) {
            clientRequestProperties = new ClientRequestProperties();
            clientRequestProperties.setClientRequestId(clientRequestId);
        }

        try {
            InputStream stream = IngestClientBase.shouldCompress(streamSourceInfo.getCompressionType(), dataFormat)
                    ? compressStream(streamSourceInfo.getStream(), streamSourceInfo.isLeaveOpen())
                    : streamSourceInfo.getStream();
            log.debug("Executing streaming ingest");
            this.streamingClient.executeStreamingIngest(ingestionProperties.getDatabaseName(),
                    ingestionProperties.getTableName(),
                    stream,
                    clientRequestProperties,
                    dataFormat.getKustoValue(),
                    ingestionProperties.getIngestionMapping().getIngestionMappingReference(),
                    !(streamSourceInfo.getCompressionType() == null || !streamSourceInfo.isLeaveOpen()));
        } catch (DataClientException | IOException e) {
            String msg = ExceptionUtils.getMessageEx(e);
            log.error(msg, e);
            throw new IngestionClientException(msg, e);
        } catch (DataServiceException e) {
            log.error(e.getMessage(), e);
            throw new IngestionServiceException(e.getMessage(), e);
        }

        log.debug("Stream was ingested successfully.");
        IngestionStatus ingestionStatus = new IngestionStatus();
        ingestionStatus.status = OperationStatus.Succeeded;
        ingestionStatus.table = ingestionProperties.getTableName();
        ingestionStatus.database = ingestionProperties.getDatabaseName();
        return new IngestionStatusResult(ingestionStatus);
    }