in ingest/src/main/java/com/microsoft/azure/kusto/ingest/ManagedStreamingIngestClient.java [331:370]
private IngestionResult streamWithRetries(SourceInfo sourceInfo, IngestionProperties ingestionProperties, @Nullable BlobClient blobClient)
throws IngestionClientException, IngestionServiceException {
ExponentialRetry<IngestionClientException, IngestionServiceException> retry = new ExponentialRetry<>(
exponentialRetryTemplate);
return retry.execute(currentAttempt -> {
try {
if (blobClient != null) {
String clientRequestId = String.format("KJC.executeManagedStreamingIngest.ingestFromBlob;%s;%d", sourceInfo.getSourceId(), currentAttempt);
return streamingIngestClient.ingestFromBlob((BlobSourceInfo) sourceInfo, ingestionProperties, clientRequestId);
} else {
String clientRequestId = String.format("KJC.executeManagedStreamingIngest.ingestFromStream;%s;%d", sourceInfo.getSourceId(),
currentAttempt);
return streamingIngestClient.ingestFromStream((StreamSourceInfo) sourceInfo, ingestionProperties, clientRequestId);
}
} catch (Exception e) {
if (e instanceof IngestionServiceException
&& e.getCause() != null
&& e.getCause() instanceof DataServiceException
&& e.getCause().getCause() != null
&& e.getCause().getCause() instanceof DataWebException) {
DataWebException webException = (DataWebException) e.getCause().getCause();
OneApiError oneApiError = webException.getApiError();
if (oneApiError.isPermanent()) {
throw e;
}
}
log.info(String.format("Streaming ingestion failed attempt %d", currentAttempt), e);
if (sourceInfo instanceof StreamSourceInfo) {
try {
((StreamSourceInfo) sourceInfo).getStream().reset();
} catch (IOException ioException) {
throw new IngestionClientException("Failed to reset stream", ioException);
}
}
}
return null;
});
}