private IngestionResult streamWithRetries()

in ingest/src/main/java/com/microsoft/azure/kusto/ingest/ManagedStreamingIngestClient.java [331:370]


    private IngestionResult streamWithRetries(SourceInfo sourceInfo, IngestionProperties ingestionProperties, @Nullable BlobClient blobClient)
            throws IngestionClientException, IngestionServiceException {
        ExponentialRetry<IngestionClientException, IngestionServiceException> retry = new ExponentialRetry<>(
                exponentialRetryTemplate);
        return retry.execute(currentAttempt -> {
            try {
                if (blobClient != null) {
                    String clientRequestId = String.format("KJC.executeManagedStreamingIngest.ingestFromBlob;%s;%d", sourceInfo.getSourceId(), currentAttempt);
                    return streamingIngestClient.ingestFromBlob((BlobSourceInfo) sourceInfo, ingestionProperties, clientRequestId);
                } else {
                    String clientRequestId = String.format("KJC.executeManagedStreamingIngest.ingestFromStream;%s;%d", sourceInfo.getSourceId(),
                            currentAttempt);
                    return streamingIngestClient.ingestFromStream((StreamSourceInfo) sourceInfo, ingestionProperties, clientRequestId);
                }
            } catch (Exception e) {
                if (e instanceof IngestionServiceException
                        && e.getCause() != null
                        && e.getCause() instanceof DataServiceException
                        && e.getCause().getCause() != null
                        && e.getCause().getCause() instanceof DataWebException) {
                    DataWebException webException = (DataWebException) e.getCause().getCause();
                    OneApiError oneApiError = webException.getApiError();
                    if (oneApiError.isPermanent()) {
                        throw e;
                    }
                }
                log.info(String.format("Streaming ingestion failed attempt %d", currentAttempt), e);

                if (sourceInfo instanceof StreamSourceInfo) {
                    try {
                        ((StreamSourceInfo) sourceInfo).getStream().reset();
                    } catch (IOException ioException) {
                        throw new IngestionClientException("Failed to reset stream", ioException);
                    }
                }

            }
            return null;
        });
    }