in resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/file/LargeFileRouteBuilder.java [54:80]
private void process(Exchange exchange) {
final String body = exchange.getMessage().getBody(String.class);
final String filePath = exchange.getMessage().getHeader(Exchange.FILE_PATH, String.class);
final File file = new File(filePath);
// Get the initial offset and use it to update the last offset when reading the first line
final Resumable resumable = exchange.getMessage().getHeader(FileConstants.INITIAL_OFFSET, Resumable.class);
final Long value = resumable.getLastOffset().getValue(Long.class);
if (lineCount == 0) {
lastOffset += value;
}
// It sums w/ 1 in order to account for the newline that is removed by readLine
lastOffset += body.length() + 1;
lineCount++;
exchange.getMessage().setHeader(Exchange.OFFSET, Resumables.of(file, lastOffset));
producerTemplate.sendBody("direct:summary", String.valueOf(lastOffset));
LOG.info("Read data: {} / offset key: {} / offset value: {}", body, filePath, lastOffset);
if (latch.getCount() == 1) {
exchange.setRouteStop(true);
}
latch.countDown();
}