public void handleRollFile()

in src/main/java/com/microsoft/azure/kusto/kafka/connect/sink/TopicPartitionWriter.java [84:131]


    public void handleRollFile(SourceFile fileDescriptor) {
        FileSourceInfo fileSourceInfo = new FileSourceInfo(fileDescriptor.path, fileDescriptor.rawBytes);

        /*
         * Since retries can be for a longer duration the Kafka Consumer may leave the group. This will result in a new Consumer reading records from the last
         * committed offset leading to duplication of records in KustoDB. Also, if the error persists, it might also result in duplicate records being written
         * into DLQ topic. Recommendation is to set the following worker configuration as `connector.client.config.override.policy=All` and set the
         * `consumer.override.max.poll.interval.ms` config to a high enough value to avoid consumer leaving the group while the Connector is retrying.
         */
        for (int retryAttempts = 0; true; retryAttempts++) {
            try {
                IngestionResult ingestionResult = client.ingestFromFile(fileSourceInfo, ingestionProps.ingestionProperties);
                if (ingestionProps.streaming && ingestionResult instanceof IngestionStatusResult) {
                    // If IngestionStatusResult returned then the ingestion status is from streaming ingest
                    IngestionStatus ingestionStatus = ingestionResult.getIngestionStatusCollection().get(0);
                    if (!hasStreamingSucceeded(ingestionStatus)) {
                        retryAttempts += ManagedStreamingIngestClient.ATTEMPT_COUNT;
                        backOffForRemainingAttempts(retryAttempts, null, fileDescriptor);
                        log.debug("Kusto ingestion: Streaming of file ({}) of size ({}) at current offset ({}) did NOT succeed; will retry",
                                fileDescriptor.path, fileDescriptor.rawBytes, currentOffset);
                        continue;
                    }
                }
                IngestionStatus ingestionStatus = null;
                if(ingestionResult!=null && ingestionResult.getIngestionStatusCollection()!=null
                        && !ingestionResult.getIngestionStatusCollection().isEmpty()){
                    ingestionStatus = ingestionResult.getIngestionStatusCollection().get(0);
                }
                log.info("Kusto ingestion: file ({}) of size ({}) at current offset ({}) with status ({})",
                        fileDescriptor.path, fileDescriptor.rawBytes, currentOffset,ingestionStatus);
                this.lastCommittedOffset = currentOffset;
                return;
            } catch (IngestionServiceException exception) {
                if (ingestionProps.streaming) {
                    Throwable innerException = exception.getCause();
                    if (innerException instanceof KustoDataExceptionBase &&
                            ((KustoDataExceptionBase) innerException).isPermanent()) {
                        throw new ConnectException(exception);
                    }
                }
                // TODO : improve handling of specific transient exceptions once the client supports them.
                // retrying transient exceptions
                backOffForRemainingAttempts(retryAttempts, exception, fileDescriptor);
            } catch (IngestionClientException | URISyntaxException exception) {
                throw new ConnectException(exception);
            }
        }
    }