extensions/standard-processors/processors/TailFile.cpp (668 lines of code) (raw):
/**
* @file TailFile.cpp
* TailFile class implementation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <algorithm>
#include <cinttypes>
#include <cstdint>
#include <iostream>
#include <limits>
#include <map>
#include <unordered_map>
#include <memory>
#include <set>
#include <string>
#include <utility>
#include <vector>
#include "range/v3/action/sort.hpp"
#include "range/v3/range/conversion.hpp"
#include "range/v3/view/transform.hpp"
#include "FlowFileRecord.h"
#include "io/CRCStream.h"
#include "utils/file/FileUtils.h"
#include "utils/file/PathUtils.h"
#include "utils/TimeUtil.h"
#include "utils/StringUtils.h"
#include "utils/ProcessorConfigUtils.h"
#include "TextFragmentUtils.h"
#include "TailFile.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "core/Resource.h"
#include "utils/RegexUtils.h"
#include "utils/expected.h"
namespace org::apache::nifi::minifi::processors {
const char *TailFile::CURRENT_STR = "CURRENT.";
const char *TailFile::POSITION_STR = "POSITION.";
namespace {
template<typename Container, typename Key>
bool containsKey(const Container &container, const Key &key) {
return container.find(key) != container.end();
}
template <typename Container, typename Key>
int64_t readOptionalInt64(const Container &container, const Key &key) {
const auto it = container.find(key);
if (it != container.end()) {
return std::stoll(it->second);
} else {
return 0;
}
}
template <typename Container, typename Key>
uint64_t readOptionalUint64(const Container &container, const Key &key) {
const auto it = container.find(key);
if (it != container.end()) {
return std::stoull(it->second);
} else {
return 0;
}
}
std::map<std::filesystem::path, TailState> update_keys_in_legacy_states(const std::map<std::filesystem::path, TailState> &legacy_tail_states) {
std::map<std::filesystem::path, TailState> new_tail_states;
for (const auto &key_value_pair : legacy_tail_states) {
const TailState &state = key_value_pair.second;
new_tail_states.emplace(state.path_ / state.file_name_, state);
}
return new_tail_states;
}
void openFile(const std::filesystem::path& file_path, uint64_t offset, std::ifstream &input_stream, const std::shared_ptr<core::logging::Logger> &logger) {
logger->log_debug("Opening %s", file_path.string());
input_stream.open(file_path, std::fstream::in | std::fstream::binary);
if (!input_stream.is_open() || !input_stream.good()) {
input_stream.close();
throw Exception(FILE_OPERATION_EXCEPTION, "Could not open file: " + file_path.string());
}
if (offset != 0U) {
input_stream.seekg(offset, std::ifstream::beg);
if (!input_stream.good()) {
logger->log_error("Seeking to %" PRIu64 " failed for file %s (does file/filesystem support seeking?)", offset, file_path.string());
throw Exception(FILE_OPERATION_EXCEPTION, "Could not seek file " + file_path.string() + " to offset " + std::to_string(offset));
}
}
}
constexpr std::size_t BUFFER_SIZE = 4096;
class FileReaderCallback {
public:
FileReaderCallback(const std::filesystem::path& file_path,
uint64_t offset,
char input_delimiter,
uint64_t checksum)
: input_delimiter_(input_delimiter),
checksum_(checksum) {
openFile(file_path, offset, input_stream_, logger_);
}
int64_t operator()(const std::shared_ptr<io::OutputStream>& output_stream) {
io::CRCStream<io::OutputStream> crc_stream{gsl::make_not_null(output_stream.get()), checksum_};
uint64_t num_bytes_written = 0;
bool found_delimiter = false;
while (hasMoreToRead() && !found_delimiter) {
if (begin_ == end_) {
input_stream_.read(reinterpret_cast<char *>(buffer_.data()), gsl::narrow<std::streamsize>(buffer_.size()));
const auto num_bytes_read = input_stream_.gcount();
logger_->log_trace("Read %jd bytes of input", std::intmax_t{num_bytes_read});
begin_ = buffer_.data();
end_ = begin_ + num_bytes_read;
}
char *delimiter_pos = std::find(begin_, end_, input_delimiter_);
found_delimiter = (delimiter_pos != end_);
const auto zlen = gsl::narrow<size_t>(std::distance(begin_, delimiter_pos)) + (found_delimiter ? 1 : 0);
crc_stream.write(reinterpret_cast<uint8_t*>(begin_), zlen);
num_bytes_written += zlen;
begin_ += zlen;
}
if (found_delimiter) {
checksum_ = crc_stream.getCRC();
} else {
latest_flow_file_ends_with_delimiter_ = false;
}
return num_bytes_written;
}
uint64_t checksum() const {
return checksum_;
}
bool hasMoreToRead() const {
return begin_ != end_ || input_stream_.good();
}
bool useLatestFlowFile() const {
return latest_flow_file_ends_with_delimiter_;
}
private:
char input_delimiter_;
uint64_t checksum_;
std::ifstream input_stream_;
std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<TailFile>::getLogger();
std::array<char, BUFFER_SIZE> buffer_{};
char *begin_ = buffer_.data();
char *end_ = buffer_.data();
bool latest_flow_file_ends_with_delimiter_ = true;
};
class WholeFileReaderCallback {
public:
WholeFileReaderCallback(const std::filesystem::path& file_path,
uint64_t offset,
uint64_t checksum)
: checksum_(checksum) {
openFile(file_path, offset, input_stream_, logger_);
}
uint64_t checksum() const {
return checksum_;
}
int64_t operator()(const std::shared_ptr<io::OutputStream>& output_stream) {
std::array<char, BUFFER_SIZE> buffer{};
io::CRCStream<io::OutputStream> crc_stream{gsl::make_not_null(output_stream.get()), checksum_};
uint64_t num_bytes_written = 0;
while (input_stream_.good()) {
input_stream_.read(buffer.data(), buffer.size());
const auto num_bytes_read = input_stream_.gcount();
logger_->log_trace("Read %jd bytes of input", std::intmax_t{num_bytes_read});
const int len = gsl::narrow<int>(num_bytes_read);
crc_stream.write(reinterpret_cast<uint8_t*>(buffer.data()), len);
num_bytes_written += len;
}
checksum_ = crc_stream.getCRC();
return num_bytes_written;
}
private:
uint64_t checksum_;
std::ifstream input_stream_;
std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<TailFile>::getLogger();
};
// This is for backwards compatibility only, as it will accept any string as Input Delimiter while only use the first character from it, which can be confusing
std::optional<char> getDelimiterOld(const std::string& delimiter_str) {
if (delimiter_str.empty()) return std::nullopt;
if (delimiter_str[0] != '\\') return delimiter_str[0];
if (delimiter_str.size() == 1) return '\\';
switch (delimiter_str[1]) {
case 'r': return '\r';
case 't': return '\t';
case 'n': return '\n';
default: return delimiter_str[1];
}
}
} // namespace
void TailFile::initialize() {
setSupportedProperties(Properties);
setSupportedRelationships(Relationships);
}
void TailFile::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory>& /*sessionFactory*/) {
gsl_Expects(context);
tail_states_.clear();
state_manager_ = context->getStateManager();
if (state_manager_ == nullptr) {
throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager");
}
if (auto delimiter_str = context->getProperty(Delimiter)) {
if (auto parsed_delimiter = utils::StringUtils::parseCharacter(*delimiter_str)) {
delimiter_ = *parsed_delimiter;
} else {
logger_->log_error("Invalid %s: \"%s\" (it should be a single character, whether escaped or not). Using the first character as the %s",
std::string(TailFile::Delimiter.name), *delimiter_str, std::string(TailFile::Delimiter.name));
delimiter_ = getDelimiterOld(*delimiter_str);
}
}
std::string file_name_str;
context->getProperty(FileName, file_name_str);
std::string mode;
context->getProperty(TailMode, mode);
if (mode == "Multiple file") {
tail_mode_ = Mode::MULTIPLE;
pattern_regex_ = utils::Regex(file_name_str);
parseAttributeProviderServiceProperty(*context);
if (auto base_dir = context->getProperty(BaseDirectory); !base_dir) {
throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "Base directory is required for multiple tail mode.");
} else {
base_dir_ = std::filesystem::path(*base_dir);
}
if (!attribute_provider_service_ && !utils::file::is_directory(base_dir_)) {
throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "Base directory does not exist or is not a directory");
}
context->getProperty(RecursiveLookup, recursive_lookup_);
// NOTE:
// context->getProperty(LookupFrequency, lookup_frequency_);
// is incorrect, as std::chrono::milliseconds::rep is unspecified, and may not be supported by getProperty()
// (e.g. in clang/libc++, this underlying type is long long)
int64_t lookup_frequency;
if (context->getProperty(LookupFrequency, lookup_frequency)) {
lookup_frequency_ = std::chrono::milliseconds{lookup_frequency};
}
recoverState(context);
doMultifileLookup(*context);
} else {
tail_mode_ = Mode::SINGLE;
auto file_to_tail = std::filesystem::path(file_name_str);
if (file_to_tail.has_filename() && file_to_tail.has_parent_path()) {
// NOTE: position and checksum will be updated in recoverState() if there is a persisted state for this file
tail_states_.emplace(file_to_tail, TailState{ file_to_tail.parent_path(), file_to_tail.filename()});
} else {
throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "File to tail must be a fully qualified file");
}
recoverState(context);
}
std::string rolling_filename_pattern_glob;
context->getProperty(RollingFilenamePattern, rolling_filename_pattern_glob);
rolling_filename_pattern_ = utils::file::globToRegex(rolling_filename_pattern_glob);
initial_start_position_ = utils::parseEnumProperty<InitialStartPositions>(*context, InitialStartPosition);
uint32_t batch_size = 0;
if (context->getProperty(BatchSize, batch_size) && batch_size != 0) {
batch_size_ = batch_size;
}
}
void TailFile::parseAttributeProviderServiceProperty(core::ProcessContext& context) {
const auto attribute_provider_service_name = context.getProperty(AttributeProviderService);
if (!attribute_provider_service_name || attribute_provider_service_name->empty()) {
return;
}
std::shared_ptr<core::controller::ControllerService> controller_service = context.getControllerService(*attribute_provider_service_name);
if (!controller_service) {
throw minifi::Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, utils::StringUtils::join_pack("Controller service '", *attribute_provider_service_name, "' not found")};
}
// we drop ownership of the service here -- in the long term, getControllerService() should return a non-owning pointer or optional reference
attribute_provider_service_ = dynamic_cast<minifi::controllers::AttributeProviderService*>(controller_service.get());
if (!attribute_provider_service_) {
throw minifi::Exception{ExceptionType::PROCESS_SCHEDULE_EXCEPTION, utils::StringUtils::join_pack("Controller service '", *attribute_provider_service_name, "' is not an AttributeProviderService")};
}
}
void TailFile::parseStateFileLine(char *buf, std::map<std::filesystem::path, TailState> &state) const {
char *line = buf;
logger_->log_trace("Received line %s", buf);
while ((line[0] == ' ') || (line[0] == '\t'))
++line;
char first = line[0];
if ((first == '\0') || (first == '#') || (first == '\r') || (first == '\n') || (first == '=')) {
return;
}
char *equal = strchr(line, '=');
if (equal == nullptr) {
return;
}
equal[0] = '\0';
std::string key = line;
equal++;
while ((equal[0] == ' ') || (equal[0] == '\t'))
++equal;
first = equal[0];
if ((first == '\0') || (first == '\r') || (first == '\n')) {
return;
}
std::string value = equal;
key = utils::StringUtils::trimRight(key);
value = utils::StringUtils::trimRight(value);
if (key == "FILENAME") {
std::filesystem::path file_path = value;
if (file_path.has_filename() && file_path.has_parent_path()) {
logger_->log_debug("State migration received path %s, file %s", file_path.parent_path().string(), file_path.filename().string());
state.emplace(file_path.filename(), TailState{file_path.parent_path(), file_path.filename()});
} else {
state.emplace(value, TailState{file_path.parent_path(), value});
}
}
if (key == "POSITION") {
// for backwards compatibility
if (tail_states_.size() != std::size_t{1}) {
throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "Incompatible state file types");
}
const auto position = std::stoull(value);
logger_->log_debug("Received position %llu", position);
state.begin()->second.position_ = gsl::narrow<uint64_t>(position);
}
if (key.find(CURRENT_STR) == 0) {
const auto file = key.substr(strlen(CURRENT_STR));
std::filesystem::path file_path = value;
if (file_path.has_filename() && file_path.has_parent_path()) {
state[file].path_ = file_path.parent_path();
state[file].file_name_ = file_path.filename();
} else {
throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "State file contains an invalid file name");
}
}
if (key.find(POSITION_STR) == 0) {
const auto file = key.substr(strlen(POSITION_STR));
state[file].position_ = std::stoull(value);
}
}
bool TailFile::recoverState(const std::shared_ptr<core::ProcessContext>& context) {
std::map<std::filesystem::path, TailState> new_tail_states;
bool state_load_success = getStateFromStateManager(new_tail_states) ||
getStateFromLegacyStateFile(context, new_tail_states);
if (!state_load_success) {
return false;
}
if (tail_mode_ == Mode::SINGLE) {
if (tail_states_.size() == 1) {
auto state_it = tail_states_.begin();
const auto it = new_tail_states.find(state_it->first);
if (it != new_tail_states.end()) {
state_it->second = it->second;
}
} else {
throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "This should never happen: "
"in Single file mode, internal state size should be 1, but it is " + std::to_string(tail_states_.size()));
}
} else {
tail_states_ = std::move(new_tail_states);
}
logState();
storeState();
return true;
}
bool TailFile::getStateFromStateManager(std::map<std::filesystem::path, TailState> &new_tail_states) const {
std::unordered_map<std::string, std::string> state_map;
if (state_manager_->get(state_map)) {
for (size_t i = 0U;; ++i) {
if (state_map.find("file." + std::to_string(i) + ".name") == state_map.end()) {
break;
}
try {
const std::string& current = state_map.at("file." + std::to_string(i) + ".current");
uint64_t position = std::stoull(state_map.at("file." + std::to_string(i) + ".position"));
uint64_t checksum = readOptionalUint64(state_map, "file." + std::to_string(i) + ".checksum");
std::chrono::file_clock::time_point last_read_time{std::chrono::milliseconds{
readOptionalInt64(state_map, "file." + std::to_string(i) + ".last_read_time")
}};
std::filesystem::path file_path = current;
if (file_path.has_filename() && file_path.has_parent_path()) {
logger_->log_debug("Received path %s, file %s", file_path.parent_path().string(), file_path.filename().string());
new_tail_states.emplace(current, TailState{file_path.parent_path(), file_path.filename(), position, last_read_time, checksum});
} else {
new_tail_states.emplace(current, TailState{file_path.parent_path(), file_path, position, last_read_time, checksum});
}
} catch (...) {
continue;
}
}
for (const auto& s : tail_states_) {
logger_->log_debug("TailState %s: %s, %s, %" PRIu64 ", %" PRIu64,
s.first.string(), s.second.path_.string(), s.second.file_name_.string(), s.second.position_, s.second.checksum_);
}
return true;
} else {
logger_->log_info("Found no stored state");
}
return false;
}
bool TailFile::getStateFromLegacyStateFile(const std::shared_ptr<core::ProcessContext>& context,
std::map<std::filesystem::path, TailState> &new_tail_states) const {
std::string state_file_name_property;
context->getProperty(StateFile, state_file_name_property);
std::string state_file = state_file_name_property + "." + getUUIDStr();
std::ifstream file(state_file.c_str(), std::ifstream::in);
if (!file.good()) {
logger_->log_info("Legacy state file %s not found (this is OK)", state_file);
return false;
}
std::map<std::filesystem::path, TailState> legacy_tail_states;
char buf[BUFFER_SIZE];
for (file.getline(buf, BUFFER_SIZE); file.good(); file.getline(buf, BUFFER_SIZE)) {
parseStateFileLine(buf, legacy_tail_states);
}
new_tail_states = update_keys_in_legacy_states(legacy_tail_states);
return true;
}
void TailFile::logState() {
logger_->log_info("State of the TailFile processor %s:", name_);
for (const auto& [key, value] : tail_states_) {
core::logging::LOG_INFO(logger_) << key << " => { " << value << " }";
}
}
std::ostream& operator<<(std::ostream &os, const TailState &tail_state) {
os << "name: " << tail_state.file_name_
<< ", position: " << tail_state.position_
<< ", checksum: " << tail_state.checksum_
<< ", last_read_time: " << tail_state.lastReadTimeInMilliseconds();
return os;
}
bool TailFile::storeState() {
std::unordered_map<std::string, std::string> state;
size_t i = 0;
for (const auto& tail_state : tail_states_) {
state["file." + std::to_string(i) + ".current"] = tail_state.first.string();
state["file." + std::to_string(i) + ".name"] = tail_state.second.file_name_.string();
state["file." + std::to_string(i) + ".position"] = std::to_string(tail_state.second.position_);
state["file." + std::to_string(i) + ".checksum"] = std::to_string(tail_state.second.checksum_);
state["file." + std::to_string(i) + ".last_read_time"] = std::to_string(tail_state.second.lastReadTimeInMilliseconds());
++i;
}
if (!state_manager_->set(state)) {
logger_->log_error("Failed to set state");
return false;
}
return true;
}
std::string TailFile::parseRollingFilePattern(const TailState &state) const {
std::size_t last_dot_position = state.file_name_.string().find_last_of('.');
std::string base_name = state.file_name_.string().substr(0, last_dot_position);
return utils::StringUtils::replaceOne(rolling_filename_pattern_, "${filename}", base_name);
}
std::vector<TailState> TailFile::findAllRotatedFiles(const TailState &state) const {
logger_->log_debug("Searching for all files rolled over");
std::string pattern = parseRollingFilePattern(state);
std::vector<TailStateWithMtime> matched_files_with_mtime;
auto collect_matching_files = [&](const std::filesystem::path& path, const std::filesystem::path& file_name) -> bool {
utils::Regex pattern_regex(pattern);
if (file_name != state.file_name_ && utils::regexMatch(file_name.string(), pattern_regex)) {
auto full_file_name = path / file_name;
TailStateWithMtime::TimePoint mtime{utils::file::last_write_time_point(full_file_name)};
logger_->log_debug("File %s with mtime %" PRId64 " matches rolling filename pattern %s, so we are reading it", file_name.string(), int64_t{mtime.time_since_epoch().count()}, pattern);
matched_files_with_mtime.emplace_back(TailState{path, file_name}, mtime);
}
return true;
};
utils::file::list_dir(state.path_, collect_matching_files, logger_, false);
return sortAndSkipMainFilePrefix(state, matched_files_with_mtime);
}
std::vector<TailState> TailFile::findRotatedFilesAfterLastReadTime(const TailState &state) const {
logger_->log_debug("Searching for files rolled over after last read time: %" PRId64, state.lastReadTimeInMilliseconds());
std::string pattern = parseRollingFilePattern(state);
std::vector<TailStateWithMtime> matched_files_with_mtime;
auto collect_matching_files = [&](const std::filesystem::path& path, const std::filesystem::path& file_name) -> bool {
utils::Regex pattern_regex(pattern);
if (file_name != state.file_name_ && utils::regexMatch(file_name.string(), pattern_regex)) {
auto full_file_name = path / file_name;
TailStateWithMtime::TimePoint mtime{utils::file::last_write_time_point(full_file_name)};
logger_->log_debug("File %s with mtime %" PRId64 " matches rolling filename pattern %s", file_name.string(), int64_t{mtime.time_since_epoch().count()}, pattern);
if (mtime >= std::chrono::time_point_cast<std::chrono::seconds>(state.last_read_time_)) {
logger_->log_debug("File %s has mtime >= last read time, so we are going to read it", file_name.string());
matched_files_with_mtime.emplace_back(TailState{path, file_name}, mtime);
}
}
return true;
};
utils::file::list_dir(state.path_, collect_matching_files, logger_, false);
return sortAndSkipMainFilePrefix(state, matched_files_with_mtime);
}
std::vector<TailState> TailFile::sortAndSkipMainFilePrefix(const TailState &state, std::vector<TailStateWithMtime>& matched_files_with_mtime) {
const auto first_by_mtime_then_by_name = [](const auto& left, const auto& right) {
return std::tie(left.mtime_, left.tail_state_.file_name_) <
std::tie(right.mtime_, right.tail_state_.file_name_);
};
matched_files_with_mtime |= ranges::actions::sort(first_by_mtime_then_by_name);
if (!matched_files_with_mtime.empty() && state.position_ > 0) {
TailState &first_rotated_file = matched_files_with_mtime[0].tail_state_;
auto full_file_name = first_rotated_file.fileNameWithPath();
if (utils::file::file_size(full_file_name) >= state.position_) {
uint64_t checksum = utils::file::computeChecksum(full_file_name, state.position_);
if (checksum == state.checksum_) {
first_rotated_file.position_ = state.position_;
first_rotated_file.checksum_ = state.checksum_;
}
}
}
std::vector<TailState> matched_files;
matched_files.reserve(matched_files_with_mtime.size());
std::transform(matched_files_with_mtime.begin(), matched_files_with_mtime.end(), std::back_inserter(matched_files),
[](TailStateWithMtime &tail_state_with_mtime) { return std::move(tail_state_with_mtime.tail_state_); });
return matched_files;
}
void TailFile::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) {
gsl_Expects(context && session);
if (tail_mode_ == Mode::MULTIPLE) {
if (last_multifile_lookup_ + lookup_frequency_ < std::chrono::steady_clock::now()) {
logger_->log_debug("Lookup frequency %" PRId64 " ms have elapsed, doing new multifile lookup", int64_t{lookup_frequency_.count()});
doMultifileLookup(*context);
} else {
logger_->log_trace("Skipping multifile lookup");
}
}
// iterate over file states. may modify them
for (auto &state : tail_states_) {
processFile(session, state.first, state.second);
}
if (!session->existsFlowFileInRelationship(Success)) {
yield();
}
first_trigger_ = false;
}
bool TailFile::isOldFileInitiallyRead(TailState &state) const {
// This is our initial processing and no stored state was found
return first_trigger_ && state.last_read_time_ == std::chrono::file_clock::time_point{};
}
void TailFile::processFile(const std::shared_ptr<core::ProcessSession> &session,
const std::filesystem::path& full_file_name,
TailState &state) {
if (isOldFileInitiallyRead(state)) {
if (initial_start_position_ == InitialStartPositions::BEGINNING_OF_TIME) {
processAllRotatedFiles(session, state);
} else if (initial_start_position_ == InitialStartPositions::CURRENT_TIME) {
state.position_ = utils::file::file_size(full_file_name);
state.last_read_time_ = std::chrono::file_clock::now();
state.checksum_ = utils::file::computeChecksum(full_file_name, state.position_);
storeState();
return;
}
} else {
uint64_t fsize = utils::file::file_size(full_file_name);
if (fsize < state.position_) {
processRotatedFilesAfterLastReadTime(session, state);
} else if (fsize == state.position_) {
logger_->log_trace("Skipping file %s as its size hasn't changed since last read", state.file_name_.string());
return;
}
}
processSingleFile(session, full_file_name, state);
storeState();
}
void TailFile::processRotatedFilesAfterLastReadTime(const std::shared_ptr<core::ProcessSession> &session, TailState &state) {
std::vector<TailState> rotated_file_states = findRotatedFilesAfterLastReadTime(state);
processRotatedFiles(session, state, rotated_file_states);
}
void TailFile::processAllRotatedFiles(const std::shared_ptr<core::ProcessSession> &session, TailState &state) {
std::vector<TailState> rotated_file_states = findAllRotatedFiles(state);
processRotatedFiles(session, state, rotated_file_states);
}
void TailFile::processRotatedFiles(const std::shared_ptr<core::ProcessSession> &session, TailState &state, std::vector<TailState> &rotated_file_states) {
for (TailState &file_state : rotated_file_states) {
processSingleFile(session, file_state.fileNameWithPath(), file_state);
}
state.position_ = 0;
state.checksum_ = 0;
}
void TailFile::processSingleFile(const std::shared_ptr<core::ProcessSession> &session,
const std::filesystem::path& full_file_name,
TailState &state) {
auto fileName = state.file_name_;
if (utils::file::file_size(full_file_name) == 0U) {
logger_->log_warn("Unable to read file %s as it does not exist or has size zero", full_file_name.string());
return;
}
logger_->log_debug("Tailing file %s from %" PRIu64, full_file_name.string(), state.position_);
std::string baseName = fileName.stem().string();
std::string extension = fileName.extension().string();
if (extension.starts_with('.'))
extension.erase(extension.begin());
if (delimiter_) {
logger_->log_trace("Looking for delimiter 0x%X", *delimiter_);
std::size_t num_flow_files = 0;
FileReaderCallback file_reader{full_file_name, state.position_, *delimiter_, state.checksum_};
TailState state_copy{state};
while (file_reader.hasMoreToRead() && (!batch_size_ || *batch_size_ > num_flow_files)) {
auto flow_file = session->create();
session->write(flow_file, std::ref(file_reader));
if (file_reader.useLatestFlowFile()) {
updateFlowFileAttributes(full_file_name, state_copy, fileName, baseName, extension, flow_file);
session->transfer(flow_file, Success);
updateStateAttributes(state_copy, flow_file->getSize(), file_reader.checksum());
++num_flow_files;
} else {
session->remove(flow_file);
}
}
state = state_copy;
logger_->log_info("%zu flowfiles were received from TailFile input", num_flow_files);
} else {
WholeFileReaderCallback file_reader{full_file_name, state.position_, state.checksum_};
auto flow_file = session->create();
session->write(flow_file, std::ref(file_reader));
updateFlowFileAttributes(full_file_name, state, fileName, baseName, extension, flow_file);
session->transfer(flow_file, Success);
updateStateAttributes(state, flow_file->getSize(), file_reader.checksum());
}
}
void TailFile::updateFlowFileAttributes(const std::filesystem::path& full_file_name, const TailState& state,
const std::filesystem::path& fileName, const std::string& baseName,
const std::string& extension,
std::shared_ptr<core::FlowFile> &flow_file) const {
logger_->log_info("TailFile %s for %" PRIu64 " bytes", fileName.string(), flow_file->getSize());
std::string logName = textfragmentutils::createFileName(baseName, extension, state.position_, flow_file->getSize());
flow_file->setAttribute(core::SpecialFlowAttribute::PATH, state.path_.string());
flow_file->addAttribute(core::SpecialFlowAttribute::ABSOLUTE_PATH, full_file_name.string());
flow_file->setAttribute(core::SpecialFlowAttribute::FILENAME, logName);
flow_file->setAttribute(textfragmentutils::BASE_NAME_ATTRIBUTE, baseName);
flow_file->setAttribute(textfragmentutils::POST_NAME_ATTRIBUTE, extension);
flow_file->setAttribute(textfragmentutils::OFFSET_ATTRIBUTE, std::to_string(state.position_));
if (extra_attributes_.contains(state.path_.string())) {
std::string prefix;
if (attribute_provider_service_) {
prefix = std::string(attribute_provider_service_->name()) + ".";
}
for (const auto& [key, value] : extra_attributes_.at(state.path_.string())) {
flow_file->setAttribute(prefix + key, value);
}
}
}
void TailFile::updateStateAttributes(TailState &state, uint64_t size, uint64_t checksum) {
state.position_ += size;
state.last_read_time_ = std::chrono::file_clock::now();
state.checksum_ = checksum;
}
void TailFile::doMultifileLookup(core::ProcessContext& context) {
checkForRemovedFiles();
checkForNewFiles(context);
last_multifile_lookup_ = std::chrono::steady_clock::now();
}
void TailFile::checkForRemovedFiles() {
gsl_Expects(pattern_regex_);
std::vector<std::filesystem::path> file_names_to_remove;
for (const auto &kv : tail_states_) {
const auto& full_file_name = kv.first;
const TailState &state = kv.second;
if (utils::file::file_size(state.fileNameWithPath()) == 0U ||
!utils::regexMatch(state.file_name_.string(), *pattern_regex_)) {
file_names_to_remove.push_back(full_file_name);
}
}
for (const auto &full_file_name : file_names_to_remove) {
tail_states_.erase(full_file_name);
}
}
void TailFile::checkForNewFiles(core::ProcessContext& context) {
gsl_Expects(pattern_regex_);
auto add_new_files_callback = [&](const std::filesystem::path& path, const std::filesystem::path& file_name) -> bool {
auto full_file_name = path / file_name;
if (!containsKey(tail_states_, full_file_name) && utils::regexMatch(file_name.string(), *pattern_regex_)) {
tail_states_.emplace(full_file_name, TailState{path, file_name});
}
return true;
};
if (!attribute_provider_service_) {
utils::file::list_dir(base_dir_, add_new_files_callback, logger_, recursive_lookup_);
return;
}
const auto attribute_maps = attribute_provider_service_->getAttributes();
if (!attribute_maps) {
logger_->log_error("Could not get attributes from the Attribute Provider Service");
return;
}
for (const auto& attribute_map : *attribute_maps) {
std::string base_dir = baseDirectoryFromAttributes(attribute_map, context);
extra_attributes_[base_dir] = attribute_map;
utils::file::list_dir(base_dir, add_new_files_callback, logger_, recursive_lookup_);
}
}
std::string TailFile::baseDirectoryFromAttributes(const controllers::AttributeProviderService::AttributeMap& attribute_map, core::ProcessContext& context) {
auto flow_file = std::make_shared<FlowFileRecord>();
for (const auto& [key, value] : attribute_map) {
flow_file->setAttribute(key, value);
}
return context.getProperty(BaseDirectory, flow_file).value();
}
std::chrono::milliseconds TailFile::getLookupFrequency() const {
return lookup_frequency_;
}
REGISTER_RESOURCE(TailFile, Processor);
} // namespace org::apache::nifi::minifi::processors