extensions/systemd/ConsumeJournald.cpp (170 lines of code) (raw):
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "ConsumeJournald.h"
#include <algorithm>
#include "date/date.h"
#include "fmt/format.h"
#include "utils/GeneralUtils.h"
#include "utils/OptionalUtils.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "core/Resource.h"
#include "utils/ProcessorConfigUtils.h"
namespace org::apache::nifi::minifi::extensions::systemd {
namespace chr = std::chrono;
ConsumeJournald::ConsumeJournald(const std::string_view name, const utils::Identifier &id, std::unique_ptr<libwrapper::LibWrapper>&& libwrapper)
: core::ProcessorImpl{name, id}, libwrapper_{std::move(libwrapper)} {
logger_ = core::logging::LoggerFactory<ConsumeJournald>::getLogger(uuid_);
}
void ConsumeJournald::initialize() {
setSupportedProperties(Properties);
setSupportedRelationships(Relationships);
worker_ = std::make_unique<utils::FifoExecutor>();
}
void ConsumeJournald::notifyStop() {
bool running = true;
if (!running_.compare_exchange_strong(running, false, std::memory_order_acq_rel) || !journal_) return;
worker_->enqueue([this] {
journal_ = nullptr;
}).get();
worker_ = nullptr;
}
void ConsumeJournald::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) {
gsl_Expects(!running_ && worker_);
batch_size_ = utils::parseU64Property(context, BatchSize);
payload_format_ = utils::parseEnumProperty<systemd::PayloadFormat>(context, PayloadFormat);
include_timestamp_ = utils::parseBoolProperty(context, IncludeTimestamp);
const auto journal_type = utils::parseEnumProperty<systemd::JournalType>(context, JournalType);
const auto process_old_messages = utils::parseBoolProperty(context, ProcessOldMessages);
timestamp_format_ = [&context] {
auto tf_prop = (context.getProperty(TimestampFormat)
| utils::orElse([]{ throw Exception{ExceptionType::PROCESSOR_EXCEPTION, "invalid timestamp format" }; }))
.value();
if (tf_prop == "ISO" || tf_prop == "ISO 8601" || tf_prop == "ISO8601") return std::string{"%FT%T%Ez"};
return tf_prop;
}();
state_manager_ = context.getStateManager();
// All journal-related API calls are thread-agnostic, meaning they need to be called from the same thread. In our environment,
// where a processor can easily be scheduled on different threads, we ensure this by executing all library calls on a dedicated
// worker thread. This is why all such operations are dispatched to a thread and immediately waited for in the initiating thread.
journal_ = worker_->enqueue([this, journal_type]{ return libwrapper_->openJournal(journal_type); }).get();
const auto seek_default = [process_old_messages](libwrapper::Journal& journal) {
return process_old_messages ? journal.seekHead() : journal.seekTail();
};
worker_->enqueue([this, &seek_default] {
const auto cursor = state_manager_->get() | utils::transform([](std::unordered_map<std::string, std::string>&& m) { return m.at(CURSOR_KEY); });
if (!cursor) {
seek_default(*journal_);
} else {
const auto ret = journal_->seekCursor(cursor->c_str());
if (ret < 0) {
const auto error_message = std::generic_category().default_error_condition(-ret).message();
logger_->log_warn("Failed to seek to cursor: {}. Seeking to tail or head (depending on Process Old Messages property) instead. cursor=\"{}\"", error_message, *cursor);
seek_default(*journal_);
}
}
}).get();
running_ = true;
}
void ConsumeJournald::onTrigger(core::ProcessContext&, core::ProcessSession& session) {
gsl_Expects(state_manager_);
if (!running_.load(std::memory_order_acquire)) return;
auto cursor_and_messages = getCursorAndMessageBatch().get();
auto messages = std::move(cursor_and_messages.second);
if (messages.empty()) {
yield();
return;
}
for (auto& msg: messages) {
const auto flow_file = session.create();
if (payload_format_ == systemd::PayloadFormat::Syslog) session.writeBuffer(flow_file, gsl::make_span(formatSyslogMessage(msg)));
for (auto& field: msg.fields) {
if (field.name == "MESSAGE" && payload_format_ == systemd::PayloadFormat::Raw) {
session.writeBuffer(flow_file, gsl::make_span(field.value));
} else {
flow_file->setAttribute(std::move(field.name), std::move(field.value));
}
}
if (include_timestamp_) flow_file->setAttribute("timestamp", date::format(timestamp_format_, chr::floor<chr::microseconds>(msg.timestamp)));
session.transfer(flow_file, Success);
}
state_manager_->set({{"cursor", std::move(cursor_and_messages.first)}});
}
std::optional<std::span<const char>> ConsumeJournald::enumerateJournalEntry(libwrapper::Journal& journal) {
const void* data_ptr{};
size_t data_length{};
const auto status_code = journal.enumerateData(&data_ptr, &data_length);
if (status_code == 0) return std::nullopt;
if (status_code < 0) throw SystemErrorException{ "sd_journal_enumerate_data", std::generic_category().default_error_condition(-status_code) };
gsl_Ensures(data_ptr && "if sd_journal_enumerate_data was successful, then data_ptr must be set");
gsl_Ensures(data_length > 0 && "if sd_journal_enumerate_data was successful, then data_length must be greater than zero");
const char* const data_str_ptr = reinterpret_cast<const char*>(data_ptr);
return gsl::make_span(data_str_ptr, data_length);
}
std::optional<ConsumeJournald::journal_field> ConsumeJournald::getNextField(libwrapper::Journal& journal) {
return enumerateJournalEntry(journal) | utils::transform([](std::span<const char> field) {
const auto eq_pos = std::find(std::begin(field), std::end(field), '=');
gsl_Ensures(eq_pos != std::end(field) && "field string must contain an equals sign");
const auto eq_idx = gsl::narrow<size_t>(eq_pos - std::begin(field));
return journal_field{
utils::span_to<std::string>(field.subspan(0, eq_idx)),
utils::span_to<std::string>(field.subspan(eq_idx + 1))
};
});
}
std::future<std::pair<std::string, std::vector<ConsumeJournald::journal_message>>> ConsumeJournald::getCursorAndMessageBatch() {
gsl_Expects(worker_);
return worker_->enqueue([this] {
std::vector<journal_message> messages;
messages.reserve(batch_size_);
for (size_t i = 0; i < batch_size_ && journal_->next() > 0; ++i) {
journal_message message;
std::optional<journal_field> field;
while ((field = getNextField(*journal_)).has_value()) {
message.fields.push_back(std::move(*field));
}
if (include_timestamp_ || payload_format_ == systemd::PayloadFormat::Syslog) {
message.timestamp = [this] {
uint64_t journal_timestamp_usec_since_epoch{};
journal_->getRealtimeUsec(&journal_timestamp_usec_since_epoch);
return std::chrono::system_clock::time_point{std::chrono::microseconds{journal_timestamp_usec_since_epoch}};
}();
}
messages.push_back(std::move(message));
}
return std::make_pair(getCursor(), messages);
});
}
std::string ConsumeJournald::formatSyslogMessage(const journal_message& msg) const {
gsl_Expects(msg.timestamp != decltype(msg.timestamp){});
const std::string* systemd_hostname = nullptr;
const std::string* syslog_pid = nullptr;
const std::string* systemd_pid = nullptr;
const std::string* syslog_identifier = nullptr;
const std::string* message = nullptr;
for (const auto& field: msg.fields) {
if (field.name == "_HOSTNAME") systemd_hostname = &field.value;
else if (field.name == "SYSLOG_PID") syslog_pid = &field.value;
else if (field.name == "_PID") systemd_pid = &field.value;
else if (field.name == "SYSLOG_IDENTIFIER") syslog_identifier = &field.value;
else if (field.name == "MESSAGE") message = &field.value;
else if (systemd_hostname && (syslog_pid || systemd_pid) && syslog_identifier && message) break;
}
gsl_Ensures(message && "MESSAGE is guaranteed to be present");
const auto pid_string = utils::optional_from_ptr(syslog_pid)
| utils::orElse([&] { return utils::optional_from_ptr(systemd_pid); })
| utils::transform([](const std::string* const pid) { return fmt::format("[{}]", *pid); });
return fmt::format("{} {} {}{}: {}",
date::format(timestamp_format_, chr::floor<chr::microseconds>(msg.timestamp)),
(utils::optional_from_ptr(systemd_hostname) | utils::transform(utils::dereference)).value_or("unknown_host"),
(utils::optional_from_ptr(syslog_identifier) | utils::transform(utils::dereference)).value_or("unknown_process"),
pid_string.value_or(std::string{}),
*message);
}
std::string ConsumeJournald::getCursor() const {
const auto cursor = [this] {
gsl::owner<char*> cursor_out = nullptr;
const auto err_code = journal_->getCursor(&cursor_out);
if (err_code < 0) throw SystemErrorException{"sd_journal_get_cursor", std::generic_category().default_error_condition(-err_code)};
gsl_Ensures(cursor_out);
return std::unique_ptr<char, utils::FreeDeleter>{cursor_out};
}();
return std::string{cursor.get()};
}
REGISTER_RESOURCE(ConsumeJournald, Processor);
} // namespace org::apache::nifi::minifi::extensions::systemd