in nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java [745:971]
private void processTailFile(final ProcessContext context, final ProcessSession session, final String tailFile) {
// If user changes the file that is being tailed, we need to consume the already-rolled-over data according
// to the Initial Start Position property
boolean rolloverOccurred;
TailFileObject tfo = states.get(tailFile);
if (tfo.isTailFileChanged()) {
rolloverOccurred = false;
final String recoverPosition = context.getProperty(START_POSITION).getValue();
if (START_BEGINNING_OF_TIME.getValue().equals(recoverPosition)) {
recoverRolledFiles(context, session, tailFile, tfo.getExpectedRecoveryChecksum(), tfo.getState().getTimestamp(), tfo.getState().getPosition());
} else if (START_CURRENT_FILE.getValue().equals(recoverPosition)) {
cleanup(context);
tfo.setState(new TailFileState(tailFile, null, null, 0L, 0L, 0L, null, tfo.getState().getBuffer()));
} else {
final String filename = tailFile;
final File file = new File(filename);
try {
final FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ);
getLogger().debug("Created FileChannel {} for {}", fileChannel, file);
final Checksum checksum = new CRC32();
final long position = file.length();
final long timestamp = file.lastModified() + 1;
try (final InputStream fis = new FileInputStream(file);
final CheckedInputStream in = new CheckedInputStream(fis, checksum)) {
StreamUtils.copy(in, new NullOutputStream(), position);
}
fileChannel.position(position);
cleanup(context);
tfo.setState(new TailFileState(filename, file, fileChannel, position, timestamp, file.length(), checksum, tfo.getState().getBuffer()));
} catch (final IOException ioe) {
getLogger().error("Attempted to position Reader at current position in file {} but failed to do so", file, ioe);
context.yield();
return;
}
}
tfo.setTailFileChanged(false);
} else {
// Recover any data that may have rolled over since the last time that this processor ran.
// If expectedRecoveryChecksum != null, that indicates that this is the first iteration since processor was started, so use whatever checksum value
// was present when the state was last persisted. In this case, we must then null out the value so that the next iteration won't keep using the "recovered"
// value. If the value is null, then we know that either the processor has already recovered that data, or there was no state persisted. In either case,
// use whatever checksum value is currently in the state.
Long expectedChecksumValue = tfo.getExpectedRecoveryChecksum();
if (expectedChecksumValue == null) {
expectedChecksumValue = tfo.getState().getChecksum() == null ? null : tfo.getState().getChecksum().getValue();
}
rolloverOccurred = recoverRolledFiles(context, session, tailFile, expectedChecksumValue, tfo.getState().getTimestamp(), tfo.getState().getPosition());
if (rolloverOccurred) {
final boolean tailAfterRollover = context.getProperty(POST_ROLLOVER_TAIL_PERIOD).asTimePeriod(TimeUnit.MILLISECONDS) > 0;
if (tailAfterRollover) {
getLogger().debug("File {} was rolled over and the Rollover Tail Period is set, so will not consume from new file during this iteration.", tailFile);
return;
}
}
tfo.setExpectedRecoveryChecksum(null);
}
// initialize local variables from state object; this is done so that we can easily change the values throughout
// the onTrigger method and then create a new state object after we finish processing the files.
TailFileState state = tfo.getState();
File file = state.getFile();
FileChannel reader = state.getReader();
Checksum checksum = state.getChecksum();
if (checksum == null) {
checksum = new CRC32();
}
long position = state.getPosition();
long timestamp = state.getTimestamp();
long length = state.getLength();
// Create a reader if necessary.
if (file == null || reader == null) {
file = new File(tailFile);
reader = createReader(file, position);
if (reader == null) {
context.yield();
return;
}
}
final long startNanos = System.nanoTime();
// Check if file has rotated
// We determine that the file has rotated if any of the following conditions are met:
// 1. 'rolloverOccured' == true, which indicates that we have found a new file matching the rollover pattern.
// 2. The file was modified after the timestamp in our state, AND the file is smaller than we expected. This satisfies
// the case where we are tailing File A, and that file is then renamed (say to B) and a new file named A is created
// and is written to. In such a case, File A may have a file size smaller than we have in our state, so we know that
// it rolled over.
// 3. The File Channel that we have indicates that the size of the file is different than file.length() indicates, AND
// the File Channel also indicates that we have read all data in the file. This case may also occur in the same scenario
// as #2, above. In this case, the File Channel is pointing to File A, but the 'file' object is pointing to File B. They
// both have the same name but are different files. As a result, once we have consumed all data from the File Channel,
// we want to roll over and consume data from the new file.
boolean rotated = rolloverOccurred;
if (!rotated) {
final long fileLength = file.length();
if (length > fileLength) {
getLogger().debug("Rotated = true because TailFileState Length = {}, File Length = {}", length, fileLength);
rotated = true;
} else {
try {
final long readerSize = reader.size();
final long readerPosition = reader.position();
if (readerSize == readerPosition && readerSize != fileLength) {
getLogger().debug("Rotated = true because readerSize={}, readerPosition={}, fileLength={}", readerSize, readerPosition, fileLength);
rotated = true;
}
} catch (final IOException e) {
getLogger().warn("Failed to determined the size or position of the File Channel when "
+ "determining if the file has rolled over. Will assume that the file being tailed has not rolled over", e);
}
}
}
if (rotated) {
// Since file has rotated, we close the reader, create a new one, and then reset our state.
try {
reader.close();
getLogger().debug("Closed FileChannel {}", reader);
} catch (final IOException ioe) {
getLogger().warn("Failed to close reader for {}", file, ioe);
}
reader = createReader(file, 0L);
position = 0L;
checksum.reset();
}
if (file.length() == position || !file.exists()) {
// no data to consume so rather than continually running, yield to allow other processors to use the thread.
getLogger().debug("No data to consume; created no FlowFiles");
tfo.setState(new TailFileState(tailFile, file, reader, position, timestamp, length, checksum, state.getBuffer()));
persistState(tfo, session, context);
context.yield();
return;
}
// If there is data to consume, read as much as we can.
final TailFileState currentState = state;
final Checksum chksum = checksum;
// data has been written to file. Stream it to a new FlowFile.
FlowFile flowFile = session.create();
final FileChannel fileReader = reader;
final AtomicLong positionHolder = new AtomicLong(position);
final boolean reReadOnNul = context.getProperty(REREAD_ON_NUL).asBoolean();
AtomicReference<NulCharacterEncounteredException> abort = new AtomicReference<>();
flowFile = session.write(flowFile, rawOut -> {
try (final OutputStream out = new BufferedOutputStream(rawOut)) {
positionHolder.set(readLines(fileReader, currentState.getBuffer(), out, chksum, reReadOnNul));
} catch (NulCharacterEncounteredException e) {
positionHolder.set(e.getRePos());
abort.set(e);
}
});
// If there ended up being no data, just remove the FlowFile
if (flowFile.getSize() == 0) {
session.remove(flowFile);
getLogger().debug("No data to consume; removed created FlowFile");
} else {
// determine filename for FlowFile by using <base filename of log file>.<initial offset>-<final offset>.<extension>
final String tailFilename = file.getName();
final String baseName = StringUtils.substringBeforeLast(tailFilename, ".");
final String flowFileName;
if (baseName.length() < tailFilename.length()) {
flowFileName = baseName + "." + position + "-" + positionHolder.get() + "." + StringUtils.substringAfterLast(tailFilename, ".");
} else {
flowFileName = baseName + "." + position + "-" + positionHolder.get();
}
flowFile = session.putAllAttributes(flowFile, Map.of(
CoreAttributes.FILENAME.key(), flowFileName,
CoreAttributes.MIME_TYPE.key(), "text/plain",
"tailfile.original.path", tailFile
));
session.getProvenanceReporter().receive(flowFile, file.toURI().toString(), "FlowFile contains bytes " + position + " through " + positionHolder.get() + " of source file",
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
session.transfer(flowFile, REL_SUCCESS);
getLogger().debug("Created {} and routed to success", flowFile);
}
if (flowFile.getSize() > 0 || linesBuffer.size() > 0) {
position = positionHolder.get();
// Set timestamp to the latest of when the file was modified and the current timestamp stored in the state.
// We do this because when we read a file that has been rolled over, we set the state to 1 millisecond later than the last mod date
// in order to avoid ingesting that file again. If we then read from this file during the same second (or millisecond, depending on the
// operating system file last mod precision), then we could set the timestamp to a smaller value, which could result in reading in the
// rotated file a second time.
timestamp = Math.max(state.getTimestamp(), file.lastModified());
length = file.length();
}
// Create a new state object to represent our current position, timestamp, etc.
tfo.setState(new TailFileState(tailFile, file, reader, position, timestamp, length, checksum, state.getBuffer()));
persistState(tfo, session, context);
if (abort.get() != null) {
final long newPosition = positionHolder.get();
try {
reader.position(newPosition);
} catch (IOException ex) {
getLogger().warn("Couldn't reposition the reader for {} due to {}", file, ex, ex);
try {
reader.close();
} catch (IOException ex2) {
getLogger().warn("Failed to close reader for {} due to {}", file, ex2, ex2);
}
}
throw abort.get();
}
}