private void process()

in resume-api/resume-api-common/src/main/java/org/apache/camel/example/resume/strategies/kafka/file/LargeFileRouteBuilder.java [54:80]


    private void process(Exchange exchange) {
        final String body = exchange.getMessage().getBody(String.class);
        final String filePath = exchange.getMessage().getHeader(Exchange.FILE_PATH, String.class);
        final File file = new File(filePath);

        // Get the initial offset and use it to update the last offset when reading the first line
        final Resumable resumable = exchange.getMessage().getHeader(FileConstants.INITIAL_OFFSET, Resumable.class);
        final Long value = resumable.getLastOffset().getValue(Long.class);

        if (lineCount == 0) {
            lastOffset += value;
        }

        // It sums w/ 1 in order to account for the newline that is removed by readLine
        lastOffset += body.length() + 1;
        lineCount++;

        exchange.getMessage().setHeader(Exchange.OFFSET, Resumables.of(file, lastOffset));

        producerTemplate.sendBody("direct:summary", String.valueOf(lastOffset));
        LOG.info("Read data: {} / offset key: {} / offset value: {}", body, filePath, lastOffset);
        if (latch.getCount() == 1) {
            exchange.setRouteStop(true);
        }

        latch.countDown();
    }