extensions/windows-event-log/ConsumeWindowsEventLog.h (218 lines of code) (raw):
/**
* @file ConsumeWindowsEventLog.h
* ConsumeWindowsEventLog class declaration
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <Windows.h>
#include <winevt.h>
#include <Objbase.h>
#include <sstream>
#include <regex>
#include <codecvt>
#include <mutex>
#include <unordered_map>
#include <tuple>
#include <map>
#include <memory>
#include <string>
#include "core/Core.h"
#include "core/Processor.h"
#include "core/ProcessSession.h"
#include "core/PropertyDefinition.h"
#include "core/PropertyDefinitionBuilder.h"
#include "minifi-cpp/core/PropertyValidator.h"
#include "core/RelationshipDefinition.h"
#include "core/StateManager.h"
#include "utils/OsUtils.h"
#include "wel/WindowsEventLog.h"
#include "wel/EventPath.h"
#include "FlowFileRecord.h"
#include "concurrentqueue.h"
#include "pugixml.hpp"
#include "utils/Enum.h"
#include "utils/Export.h"
#include "utils/RegexUtils.h"
namespace org::apache::nifi::minifi::processors {
namespace cwel {
struct EventRender {
std::map<std::string, std::string> matched_fields;
std::string xml;
std::string plaintext;
std::string json;
};
enum class OutputFormat {
XML,
Both, // Both is DEPRECATED and removed from the documentation, but kept for backwards compatibility; it means XML + Plaintext
Plaintext,
JSON
};
enum class JsonFormat {
Raw,
Simple,
Flattened,
};
} // namespace cwel
class Bookmark;
class ConsumeWindowsEventLog : public core::ProcessorImpl {
public:
explicit ConsumeWindowsEventLog(const std::string_view name, const utils::Identifier& uuid = {});
~ConsumeWindowsEventLog() override;
EXTENSIONAPI static constexpr const char* Description = "Registers a Windows Event Log Subscribe Callback to receive FlowFiles from Events on Windows. These can be filtered via channel and XPath.";
EXTENSIONAPI static constexpr auto Channel = core::PropertyDefinitionBuilder<>::createProperty("Channel")
.isRequired(true)
.withDefaultValue("System")
.withDescription("The Windows Event Log Channel to listen to. In order to process logs from a log file use the format 'SavedLog:\\<file path\\>'.")
.supportsExpressionLanguage(true)
.build();
EXTENSIONAPI static constexpr auto Query = core::PropertyDefinitionBuilder<>::createProperty("Query")
.isRequired(true)
.withDefaultValue("*")
.withDescription("XPath Query to filter events. (See https://msdn.microsoft.com/en-us/library/windows/desktop/dd996910(v=vs.85).aspx for examples.)")
.supportsExpressionLanguage(true)
.build();
EXTENSIONAPI static constexpr auto MaxBufferSize = core::PropertyDefinitionBuilder<>::createProperty("Max Buffer Size")
.isRequired(true)
.withValidator(core::StandardPropertyValidators::DATA_SIZE_VALIDATOR)
.withDefaultValue("1 MB")
.withDescription("The individual Event Log XMLs are rendered to a buffer."
" This specifies the maximum size in bytes that the buffer will be allowed to grow to. (Limiting the maximum size of an individual Event XML.)")
.build();
// !!! This property is obsolete since now subscription is not used, but leave since it might be is used already in config.yml.
EXTENSIONAPI static constexpr auto InactiveDurationToReconnect = core::PropertyDefinitionBuilder<>::createProperty("Inactive Duration To Reconnect")
.isRequired(true)
.withValidator(core::StandardPropertyValidators::TIME_PERIOD_VALIDATOR)
.withDefaultValue("10 min")
.withDescription("If no new event logs are processed for the specified time period, "
" this processor will try reconnecting to recover from a state where any further messages cannot be consumed."
" Such situation can happen if Windows Event Log service is restarted, or ERROR_EVT_QUERY_RESULT_STALE (15011) is returned."
" Setting no duration, e.g. '0 ms' disables auto-reconnection.")
.build();
EXTENSIONAPI static constexpr auto IdentifierMatcher = core::PropertyDefinitionBuilder<>::createProperty("Identifier Match Regex")
.isRequired(false)
.withDefaultValue(".*Sid")
.withDescription("Regular Expression to match Subject Identifier Fields. These will be placed into the attributes of the FlowFile")
.build();
EXTENSIONAPI static constexpr auto IdentifierFunction = core::PropertyDefinitionBuilder<>::createProperty("Apply Identifier Function")
.isRequired(false)
.withValidator(core::StandardPropertyValidators::BOOLEAN_VALIDATOR)
.withDefaultValue("true")
.withDescription("If true it will resolve SIDs matched in the 'Identifier Match Regex' to the DOMAIN\\USERNAME associated with that ID")
.build();
EXTENSIONAPI static constexpr auto ResolveAsAttributes = core::PropertyDefinitionBuilder<>::createProperty("Resolve Metadata in Attributes")
.isRequired(false)
.withValidator(core::StandardPropertyValidators::BOOLEAN_VALIDATOR)
.withDefaultValue("true")
.withDescription("If true, any metadata that is resolved ( such as IDs or keyword metadata ) will be placed into attributes, otherwise it will be replaced in the XML or text output")
.build();
EXTENSIONAPI static constexpr auto EventHeaderDelimiter = core::PropertyDefinitionBuilder<>::createProperty("Event Header Delimiter")
.isRequired(false)
.withDescription("If set, the chosen delimiter will be used in the Event output header. Otherwise, a colon followed by spaces will be used.")
.build();
EXTENSIONAPI static constexpr auto EventHeader = core::PropertyDefinitionBuilder<>::createProperty("Event Header")
.isRequired(false)
.withDefaultValue("LOG_NAME=Log Name, SOURCE = Source, TIME_CREATED = Date,EVENT_RECORDID=Record ID,EVENTID = Event ID,"
"TASK_CATEGORY = Task Category,LEVEL = Level,KEYWORDS = Keywords,USER = User,COMPUTER = Computer, EVENT_TYPE = EventType")
.withDescription("Comma seperated list of key/value pairs with the following keys LOG_NAME, SOURCE, TIME_CREATED,EVENT_RECORDID,"
"EVENTID,TASK_CATEGORY,LEVEL,KEYWORDS,USER,COMPUTER, and EVENT_TYPE. Eliminating fields will remove them from the header.")
.build();
EXTENSIONAPI static constexpr auto OutputFormatProperty = core::PropertyDefinitionBuilder<magic_enum::enum_count<cwel::OutputFormat>()>::createProperty("Output Format")
.isRequired(true)
.withDefaultValue(magic_enum::enum_name(cwel::OutputFormat::XML))
.withAllowedValues(magic_enum::enum_names<cwel::OutputFormat>())
.withDescription("The format of the output flow files.")
.build();
EXTENSIONAPI static constexpr auto JsonFormatProperty = core::PropertyDefinitionBuilder<magic_enum::enum_count<cwel::JsonFormat>()>::createProperty("JSON Format")
.isRequired(true)
.withDefaultValue(magic_enum::enum_name(cwel::JsonFormat::Simple))
.withAllowedValues(magic_enum::enum_names<cwel::JsonFormat>())
.withDescription("Set the json format type. Only applicable if Output Format is set to 'JSON'")
.build();
EXTENSIONAPI static constexpr auto BatchCommitSize = core::PropertyDefinitionBuilder<>::createProperty("Batch Commit Size")
.isRequired(false)
.withValidator(core::StandardPropertyValidators::UNSIGNED_INTEGER_VALIDATOR)
.withDefaultValue("1000")
.withDescription("Maximum number of Events to consume and create to Flow Files from before committing.")
.build();
EXTENSIONAPI static constexpr auto BookmarkRootDirectory = core::PropertyDefinitionBuilder<>::createProperty("State Directory")
.isRequired(false)
.withDefaultValue("CWELState")
.withDescription("DEPRECATED. Only use it for state migration from the state file, supplying the legacy state directory.")
.build();
EXTENSIONAPI static constexpr auto ProcessOldEvents = core::PropertyDefinitionBuilder<>::createProperty("Process Old Events")
.isRequired(true)
.withValidator(core::StandardPropertyValidators::BOOLEAN_VALIDATOR)
.withDefaultValue("false")
.withDescription("This property defines if old events (which are created before first time server is started) should be processed.")
.build();
EXTENSIONAPI static constexpr auto CacheSidLookups = core::PropertyDefinitionBuilder<>::createProperty("Cache SID Lookups")
.isRequired(false)
.withValidator(core::StandardPropertyValidators::BOOLEAN_VALIDATOR)
.withDefaultValue("true")
.withDescription("Determines whether SID to name lookups are cached in memory")
.build();
EXTENSIONAPI static constexpr auto Properties = std::to_array<core::PropertyReference>({
Channel,
Query,
MaxBufferSize,
InactiveDurationToReconnect,
IdentifierMatcher,
IdentifierFunction,
ResolveAsAttributes,
EventHeaderDelimiter,
EventHeader,
OutputFormatProperty,
JsonFormatProperty,
BatchCommitSize,
BookmarkRootDirectory,
ProcessOldEvents,
CacheSidLookups
});
EXTENSIONAPI static constexpr auto Success = core::RelationshipDefinition{"success", "Relationship for successfully consumed events."};
EXTENSIONAPI static constexpr auto Relationships = std::array{Success};
EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false;
EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false;
EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_FORBIDDEN;
EXTENSIONAPI static constexpr bool IsSingleThreaded = true;
ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS
void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) override;
void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override;
void initialize() override;
void notifyStop() override;
private:
void refreshTimeZoneData();
void putEventRenderFlowFileToSession(const cwel::EventRender& eventRender, core::ProcessSession& session) const;
wel::WindowsEventLogHandler& getEventLogHandler(const std::string& name);
static bool insertHeaderName(wel::METADATA_NAMES& header, const std::string& key, const std::string& value);
nonstd::expected<cwel::EventRender, std::string> createEventRender(EVT_HANDLE eventHandle);
void substituteXMLPercentageItems(pugi::xml_document& doc);
std::function<std::string(const std::string&)> userIdToUsernameFunction() const;
nonstd::expected<std::string, std::string> renderEventAsXml(EVT_HANDLE event_handle);
struct TimeDiff {
auto operator()() const {
return std::chrono::steady_clock::now() - time_;
}
const decltype(std::chrono::steady_clock::now()) time_ = std::chrono::steady_clock::now();
};
bool commitAndSaveBookmark(const std::wstring& bookmarkXml, core::ProcessContext& context, core::ProcessSession& session);
std::tuple<size_t, std::wstring> processEventLogs(core::ProcessSession& session,
const EVT_HANDLE& event_query_results);
void addMatchedFieldsAsAttributes(const cwel::EventRender &eventRender, core::ProcessSession &session, const std::shared_ptr<core::FlowFile> &flowFile) const;
core::StateManager* state_manager_{nullptr};
wel::METADATA_NAMES header_names_;
std::optional<std::string> header_delimiter_;
wel::EventPath path_;
std::wstring wstr_query_;
std::optional<utils::Regex> regex_;
bool resolve_as_attributes_{false};
bool apply_identifier_function_{false};
std::string provenanceUri_;
std::string computerName_;
uint64_t max_buffer_size_{};
std::map<std::string, wel::WindowsEventLogHandler> providers_;
uint64_t batch_commit_size_{};
bool cache_sid_lookups_ = true;
cwel::OutputFormat output_format_;
cwel::JsonFormat json_format_;
std::unique_ptr<Bookmark> bookmark_;
std::mutex on_trigger_mutex_;
std::unordered_map<std::string, std::string> xmlPercentageItemsResolutions_;
HMODULE hMsobjsDll_{};
std::string timezone_name_;
std::string timezone_offset_; // Represented as UTC offset in (+|-)HH:MM format, like +02:00
};
} // namespace org::apache::nifi::minifi::processors