extensions/systemd/ConsumeJournald.h (122 lines of code) (raw):

/** * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include <atomic> #include <chrono> #include <future> #include <memory> #include <optional> #include <string> #include <type_traits> #include <unordered_map> #include <utility> #include <vector> #include "core/StateManager.h" #include "core/Processor.h" #include "core/PropertyDefinition.h" #include "core/PropertyDefinitionBuilder.h" #include "minifi-cpp/core/PropertyValidator.h" #include "core/RelationshipDefinition.h" #include "core/logging/LoggerFactory.h" #include "libwrapper/LibWrapper.h" #include "utils/Deleters.h" #include "utils/gsl.h" #include "utils/FifoExecutor.h" namespace org::apache::nifi::minifi::extensions::systemd { enum class PayloadFormat { Raw, Syslog }; class ConsumeJournald final : public core::ProcessorImpl { public: static constexpr const char* CURSOR_KEY = "cursor"; static constexpr const char* PAYLOAD_FORMAT_RAW = "Raw"; static constexpr const char* PAYLOAD_FORMAT_SYSLOG = "Syslog"; static constexpr const char* JOURNAL_TYPE_USER = "User"; static constexpr const char* JOURNAL_TYPE_SYSTEM = "System"; static constexpr const char* JOURNAL_TYPE_BOTH = "Both"; EXTENSIONAPI static constexpr const char* Description = "Consume systemd-journald journal messages. Creates one flow file per message. " "Fields are mapped to attributes. Realtime timestamp is mapped to the 'timestamp' attribute. Available on Linux only."; EXTENSIONAPI static constexpr auto BatchSize = core::PropertyDefinitionBuilder<>::createProperty("Batch Size") .withDescription("The maximum number of entries processed in a single execution.") .withValidator(core::StandardPropertyValidators::UNSIGNED_INTEGER_VALIDATOR) .withDefaultValue("1000") .isRequired(true) .build(); EXTENSIONAPI static constexpr auto PayloadFormat = core::PropertyDefinitionBuilder<2>::createProperty("Payload Format") .withDescription("Configures flow file content formatting. Raw: only the message. Syslog: similar to syslog or journalctl output.") .withDefaultValue(PAYLOAD_FORMAT_SYSLOG) .withAllowedValues({PAYLOAD_FORMAT_RAW, PAYLOAD_FORMAT_SYSLOG}) .isRequired(true) .build(); EXTENSIONAPI static constexpr auto IncludeTimestamp = core::PropertyDefinitionBuilder<>::createProperty("Include Timestamp") .withDescription("Include message timestamp in the 'timestamp' attribute.") .withValidator(core::StandardPropertyValidators::BOOLEAN_VALIDATOR) .withDefaultValue("true") .isRequired(true) .build(); EXTENSIONAPI static constexpr auto JournalType = core::PropertyDefinitionBuilder<3>::createProperty("Journal Type") .withDescription("Type of journal to consume.") .withDefaultValue(JOURNAL_TYPE_SYSTEM) .withAllowedValues({JOURNAL_TYPE_USER, JOURNAL_TYPE_SYSTEM, JOURNAL_TYPE_BOTH}) .isRequired(true) .build(); EXTENSIONAPI static constexpr auto ProcessOldMessages = core::PropertyDefinitionBuilder<>::createProperty("Process Old Messages") .withDescription("Process events created before the first usage (schedule) of the processor instance.") .withValidator(core::StandardPropertyValidators::BOOLEAN_VALIDATOR) .withDefaultValue("false") .isRequired(true) .build(); EXTENSIONAPI static constexpr auto TimestampFormat = core::PropertyDefinitionBuilder<>::createProperty("Timestamp Format") .withDescription("Format string to use when creating the timestamp attribute or writing messages in the syslog format. " "ISO/ISO 8601/ISO8601 are equivalent to \"%FT%T%Ez\". " "See https://howardhinnant.github.io/date/date.html#to_stream_formatting for all flags.") .withDefaultValue("%x %X %Z") .isRequired(true) .build(); EXTENSIONAPI static constexpr auto Properties = std::to_array<core::PropertyReference>({ BatchSize, PayloadFormat, IncludeTimestamp, JournalType, ProcessOldMessages, TimestampFormat }); EXTENSIONAPI static constexpr auto Success = core::RelationshipDefinition{"success", "Successfully consumed journal messages."}; EXTENSIONAPI static constexpr auto Relationships = std::array{Success}; EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false; EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false; EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_FORBIDDEN; EXTENSIONAPI static constexpr bool IsSingleThreaded = false; ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS explicit ConsumeJournald(std::string_view name, const utils::Identifier& id = {}, std::unique_ptr<libwrapper::LibWrapper>&& = libwrapper::createLibWrapper()); ConsumeJournald(const ConsumeJournald&) = delete; ConsumeJournald(ConsumeJournald&&) = delete; ConsumeJournald& operator=(const ConsumeJournald&) = delete; ConsumeJournald& operator=(ConsumeJournald&&) = delete; ~ConsumeJournald() final { notifyStop(); } void initialize() final; void notifyStop() final; void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) final; void onTrigger(core::ProcessContext& context, core::ProcessSession& session) final; friend struct ConsumeJournaldTestAccessor; private: struct journal_field { std::string name; std::string value; }; struct journal_message { std::vector<journal_field> fields; std::chrono::system_clock::time_point timestamp; }; static std::optional<std::span<const char>> enumerateJournalEntry(libwrapper::Journal&); static std::optional<journal_field> getNextField(libwrapper::Journal&); std::future<std::pair<std::string, std::vector<journal_message>>> getCursorAndMessageBatch(); std::string formatSyslogMessage(const journal_message&) const; std::string getCursor() const; std::atomic<bool> running_{false}; core::StateManager* state_manager_ = nullptr; std::unique_ptr<libwrapper::LibWrapper> libwrapper_; std::unique_ptr<utils::FifoExecutor> worker_; std::unique_ptr<libwrapper::Journal> journal_; uint64_t batch_size_ = 1000; systemd::PayloadFormat payload_format_ = systemd::PayloadFormat::Syslog; bool include_timestamp_ = true; std::string timestamp_format_ = "%x %X %Z"; }; } // namespace org::apache::nifi::minifi::extensions::systemd