in src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/TopicPartitionWriter.java [84:131]
public void handleRollFile(SourceFile fileDescriptor) {
FileSourceInfo fileSourceInfo = new FileSourceInfo(fileDescriptor.path, fileDescriptor.rawBytes);
/*
* Since retries can be for a longer duration the Kafka Consumer may leave the group. This will result in a new Consumer reading records from the last
* committed offset leading to duplication of records in KustoDB. Also, if the error persists, it might also result in duplicate records being written
* into DLQ topic. Recommendation is to set the following worker configuration as `connector.client.config.override.policy=All` and set the
* `consumer.override.max.poll.interval.ms` config to a high enough value to avoid consumer leaving the group while the Connector is retrying.
*/
for (int retryAttempts = 0; true; retryAttempts++) {
try {
IngestionResult ingestionResult = client.ingestFromFile(fileSourceInfo, ingestionProps.ingestionProperties);
if (ingestionProps.streaming && ingestionResult instanceof IngestionStatusResult) {
// If IngestionStatusResult returned then the ingestion status is from streaming ingest
IngestionStatus ingestionStatus = ingestionResult.getIngestionStatusCollection().get(0);
if (!hasStreamingSucceeded(ingestionStatus)) {
retryAttempts += ManagedStreamingIngestClient.ATTEMPT_COUNT;
backOffForRemainingAttempts(retryAttempts, null, fileDescriptor);
log.debug("Kusto ingestion: Streaming of file ({}) of size ({}) at current offset ({}) did NOT succeed; will retry",
fileDescriptor.path, fileDescriptor.rawBytes, currentOffset);
continue;
}
}
IngestionStatus ingestionStatus = null;
if(ingestionResult!=null && ingestionResult.getIngestionStatusCollection()!=null
&& !ingestionResult.getIngestionStatusCollection().isEmpty()){
ingestionStatus = ingestionResult.getIngestionStatusCollection().get(0);
}
log.info("Kusto ingestion: file ({}) of size ({}) at current offset ({}) with status ({})",
fileDescriptor.path, fileDescriptor.rawBytes, currentOffset,ingestionStatus);
this.lastCommittedOffset = currentOffset;
return;
} catch (IngestionServiceException exception) {
if (ingestionProps.streaming) {
Throwable innerException = exception.getCause();
if (innerException instanceof KustoDataExceptionBase &&
((KustoDataExceptionBase) innerException).isPermanent()) {
throw new ConnectException(exception);
}
}
// TODO : improve handling of specific transient exceptions once the client supports them.
// retrying transient exceptions
backOffForRemainingAttempts(retryAttempts, exception, fileDescriptor);
} catch (IngestionClientException | URISyntaxException exception) {
throw new ConnectException(exception);
}
}
}