private void processTailFile()

in nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java [745:971]


    private void processTailFile(final ProcessContext context, final ProcessSession session, final String tailFile) {
        // If user changes the file that is being tailed, we need to consume the already-rolled-over data according
        // to the Initial Start Position property
        boolean rolloverOccurred;
        TailFileObject tfo = states.get(tailFile);

        if (tfo.isTailFileChanged()) {
            rolloverOccurred = false;
            final String recoverPosition = context.getProperty(START_POSITION).getValue();

            if (START_BEGINNING_OF_TIME.getValue().equals(recoverPosition)) {
                recoverRolledFiles(context, session, tailFile, tfo.getExpectedRecoveryChecksum(), tfo.getState().getTimestamp(), tfo.getState().getPosition());
            } else if (START_CURRENT_FILE.getValue().equals(recoverPosition)) {
                cleanup(context);
                tfo.setState(new TailFileState(tailFile, null, null, 0L, 0L, 0L, null, tfo.getState().getBuffer()));
            } else {
                final String filename = tailFile;
                final File file = new File(filename);

                try {
                    final FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ);
                    getLogger().debug("Created FileChannel {} for {}", fileChannel, file);

                    final Checksum checksum = new CRC32();
                    final long position = file.length();
                    final long timestamp = file.lastModified() + 1;

                    try (final InputStream fis = new FileInputStream(file);
                            final CheckedInputStream in = new CheckedInputStream(fis, checksum)) {
                        StreamUtils.copy(in, new NullOutputStream(), position);
                    }

                    fileChannel.position(position);
                    cleanup(context);
                    tfo.setState(new TailFileState(filename, file, fileChannel, position, timestamp, file.length(), checksum, tfo.getState().getBuffer()));
                } catch (final IOException ioe) {
                    getLogger().error("Attempted to position Reader at current position in file {} but failed to do so", file, ioe);
                    context.yield();
                    return;
                }
            }

            tfo.setTailFileChanged(false);
        } else {
            // Recover any data that may have rolled over since the last time that this processor ran.
            // If expectedRecoveryChecksum != null, that indicates that this is the first iteration since processor was started, so use whatever checksum value
            // was present when the state was last persisted. In this case, we must then null out the value so that the next iteration won't keep using the "recovered"
            // value. If the value is null, then we know that either the processor has already recovered that data, or there was no state persisted. In either case,
            // use whatever checksum value is currently in the state.
            Long expectedChecksumValue = tfo.getExpectedRecoveryChecksum();
            if (expectedChecksumValue == null) {
                expectedChecksumValue = tfo.getState().getChecksum() == null ? null : tfo.getState().getChecksum().getValue();
            }

            rolloverOccurred = recoverRolledFiles(context, session, tailFile, expectedChecksumValue, tfo.getState().getTimestamp(), tfo.getState().getPosition());
            if (rolloverOccurred) {
                final boolean tailAfterRollover = context.getProperty(POST_ROLLOVER_TAIL_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS) > 0;
                if (tailAfterRollover) {
                    getLogger().debug("File {} was rolled over and the Rollover Tail Period is set, so will not consume from new file during this iteration.", tailFile);
                    return;
                }
            }

            tfo.setExpectedRecoveryChecksum(null);
        }

        // initialize local variables from state object; this is done so that we can easily change the values throughout
        // the onTrigger method and then create a new state object after we finish processing the files.
        TailFileState state = tfo.getState();
        File file = state.getFile();
        FileChannel reader = state.getReader();
        Checksum checksum = state.getChecksum();
        if (checksum == null) {
            checksum = new CRC32();
        }
        long position = state.getPosition();
        long timestamp = state.getTimestamp();
        long length = state.getLength();

        // Create a reader if necessary.
        if (file == null || reader == null) {
            file = new File(tailFile);
            reader = createReader(file, position);
            if (reader == null) {
                context.yield();
                return;
            }
        }

        final long startNanos = System.nanoTime();

        // Check if file has rotated
        // We determine that the file has rotated if any of the following conditions are met:
        // 1. 'rolloverOccured' == true, which indicates that we have found a new file matching the rollover pattern.
        // 2. The file was modified after the timestamp in our state, AND the file is smaller than we expected. This satisfies
        //    the case where we are tailing File A, and that file is then renamed (say to B) and a new file named A is created
        //    and is written to. In such a case, File A may have a file size smaller than we have in our state, so we know that
        //    it rolled over.
        // 3. The File Channel that we have indicates that the size of the file is different than file.length() indicates, AND
        //    the File Channel also indicates that we have read all data in the file. This case may also occur in the same scenario
        //    as #2, above. In this case, the File Channel is pointing to File A, but the 'file' object is pointing to File B. They
        //    both have the same name but are different files. As a result, once we have consumed all data from the File Channel,
        //    we want to roll over and consume data from the new file.
        boolean rotated = rolloverOccurred;
        if (!rotated) {
            final long fileLength = file.length();
            if (length > fileLength) {
                getLogger().debug("Rotated = true because TailFileState Length = {}, File Length = {}", length, fileLength);
                rotated = true;
            } else {
                try {
                    final long readerSize = reader.size();
                    final long readerPosition = reader.position();

                    if (readerSize == readerPosition && readerSize != fileLength) {
                        getLogger().debug("Rotated = true because readerSize={}, readerPosition={}, fileLength={}", readerSize, readerPosition, fileLength);
                        rotated = true;
                    }
                } catch (final IOException e) {
                    getLogger().warn("Failed to determined the size or position of the File Channel when "
                        + "determining if the file has rolled over. Will assume that the file being tailed has not rolled over", e);
                }
            }
        }

        if (rotated) {
            // Since file has rotated, we close the reader, create a new one, and then reset our state.
            try {
                reader.close();
                getLogger().debug("Closed FileChannel {}", reader);
            } catch (final IOException ioe) {
                getLogger().warn("Failed to close reader for {}", file, ioe);
            }

            reader = createReader(file, 0L);
            position = 0L;
            checksum.reset();
        }

        if (file.length() == position || !file.exists()) {
            // no data to consume so rather than continually running, yield to allow other processors to use the thread.
            getLogger().debug("No data to consume; created no FlowFiles");
            tfo.setState(new TailFileState(tailFile, file, reader, position, timestamp, length, checksum, state.getBuffer()));
            persistState(tfo, session, context);
            context.yield();
            return;
        }

        // If there is data to consume, read as much as we can.
        final TailFileState currentState = state;
        final Checksum chksum = checksum;
        // data has been written to file. Stream it to a new FlowFile.
        FlowFile flowFile = session.create();

        final FileChannel fileReader = reader;
        final AtomicLong positionHolder = new AtomicLong(position);
        final boolean reReadOnNul = context.getProperty(REREAD_ON_NUL).asBoolean();

        AtomicReference<NulCharacterEncounteredException> abort = new AtomicReference<>();
        flowFile = session.write(flowFile, rawOut -> {
            try (final OutputStream out = new BufferedOutputStream(rawOut)) {
                positionHolder.set(readLines(fileReader, currentState.getBuffer(), out, chksum, reReadOnNul));
            } catch (NulCharacterEncounteredException e) {
                positionHolder.set(e.getRePos());
                abort.set(e);
            }
        });

        // If there ended up being no data, just remove the FlowFile
        if (flowFile.getSize() == 0) {
            session.remove(flowFile);
            getLogger().debug("No data to consume; removed created FlowFile");
        } else {
            // determine filename for FlowFile by using <base filename of log file>.<initial offset>-<final offset>.<extension>
            final String tailFilename = file.getName();
            final String baseName = StringUtils.substringBeforeLast(tailFilename, ".");
            final String flowFileName;
            if (baseName.length() < tailFilename.length()) {
                flowFileName = baseName + "." + position + "-" + positionHolder.get() + "." + StringUtils.substringAfterLast(tailFilename, ".");
            } else {
                flowFileName = baseName + "." + position + "-" + positionHolder.get();
            }

            flowFile = session.putAllAttributes(flowFile, Map.of(
                    CoreAttributes.FILENAME.key(), flowFileName,
                    CoreAttributes.MIME_TYPE.key(), "text/plain",
                    "tailfile.original.path", tailFile
            ));
            session.getProvenanceReporter().receive(flowFile, file.toURI().toString(), "FlowFile contains bytes " + position + " through " + positionHolder.get() + " of source file",
                TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
            session.transfer(flowFile, REL_SUCCESS);
            getLogger().debug("Created {} and routed to success", flowFile);
        }

        if (flowFile.getSize() > 0 || linesBuffer.size() > 0) {
            position = positionHolder.get();

            // Set timestamp to the latest of when the file was modified and the current timestamp stored in the state.
            // We do this because when we read a file that has been rolled over, we set the state to 1 millisecond later than the last mod date
            // in order to avoid ingesting that file again. If we then read from this file during the same second (or millisecond, depending on the
            // operating system file last mod precision), then we could set the timestamp to a smaller value, which could result in reading in the
            // rotated file a second time.
            timestamp = Math.max(state.getTimestamp(), file.lastModified());
            length = file.length();
        }

        // Create a new state object to represent our current position, timestamp, etc.
        tfo.setState(new TailFileState(tailFile, file, reader, position, timestamp, length, checksum, state.getBuffer()));

        persistState(tfo, session, context);

        if (abort.get() != null) {
            final long newPosition = positionHolder.get();
            try {
                reader.position(newPosition);
            } catch (IOException ex) {
                getLogger().warn("Couldn't reposition the reader for {} due to {}", file, ex, ex);
                try {
                    reader.close();
                } catch (IOException ex2) {
                    getLogger().warn("Failed to close reader for {} due to {}", file, ex2, ex2);
                }
            }

            throw abort.get();
        }
    }