extensions/windows-event-log/CollectorInitiatedSubscription.cpp (461 lines of code) (raw):
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "CollectorInitiatedSubscription.h"
#include <vector>
#include <queue>
#include <map>
#include <set>
#include <sstream>
#include <string>
#include <memory>
#include <codecvt>
#include <utility>
#include "io/BufferStream.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "core/ProcessSessionFactory.h"
#include "core/Resource.h"
#include "utils/gsl.h"
#include "utils/OptionalUtils.h"
#pragma comment(lib, "wevtapi.lib")
#pragma comment(lib, "Wecapi.lib")
using namespace std::literals::chrono_literals;
namespace org::apache::nifi::minifi::processors {
#define LOG_SUBSCRIPTION_ERROR(error) logError(__LINE__, error)
#define LOG_SUBSCRIPTION_WINDOWS_ERROR(info) logWindowsError(__LINE__, info)
namespace {
std::string to_string(const wchar_t *pChar) {
return std::wstring_convert<std::codecvt_utf8<wchar_t>>().to_bytes(pChar);
}
std::wstring to_wstring(const std::string& utf8_string) {
return std::wstring_convert<std::codecvt_utf8<wchar_t>>().from_bytes(utf8_string);
}
} // namespace
CollectorInitiatedSubscription::CollectorInitiatedSubscription(const std::string& name, const utils::Identifier& uuid)
: core::Processor(name, uuid), logger_(core::logging::LoggerFactory<CollectorInitiatedSubscription>::getLogger(uuid_)) {
char buff[MAX_COMPUTERNAME_LENGTH + 1];
DWORD size = sizeof(buff);
if (GetComputerName(buff, &size)) {
computerName_ = buff;
} else {
LOG_SUBSCRIPTION_WINDOWS_ERROR("GetComputerName");
}
}
void CollectorInitiatedSubscription::initialize() {
setSupportedProperties(Properties);
setSupportedRelationships(Relationships);
}
void CollectorInitiatedSubscription::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
gsl_Expects(context);
if (subscriptionHandle_) {
logger_->log_error("Processor already subscribed to Event Log, expected cleanup to unsubscribe.");
} else {
sessionFactory_ = sessionFactory;
if (!createSubscription(context))
return;
if (!checkSubscriptionRuntimeStatus())
return;
subscribe(context);
}
subscription_name_ = to_wstring(context->getProperty(SubscriptionName).value());
max_buffer_size_ = context->getProperty<core::DataSizeValue>(MaxBufferSize).value();
}
void CollectorInitiatedSubscription::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
gsl_Expects(context);
if (!subscriptionHandle_) {
if (!subscribe(context)) {
context->yield();
return;
}
}
checkSubscriptionRuntimeStatus();
const auto flowFileCount = processQueue(session);
const auto now = GetTickCount64();
if (flowFileCount > 0) {
lastActivityTimestamp_ = now;
} else if (auto inactive_duration_to_reconnect_ms = context->getProperty<core::TimePeriodValue>(InactiveDurationToReconnect)
| utils::map([](const auto& time_period_value) { return time_period_value.getMilliseconds().count(); });
inactive_duration_to_reconnect_ms && *inactive_duration_to_reconnect_ms > 0) {
if ((now - lastActivityTimestamp_) > *inactive_duration_to_reconnect_ms) {
logger_->log_info("Exceeds configured 'inactive duration to reconnect' %lld ms. Unsubscribe to reconnect..", *inactive_duration_to_reconnect_ms);
unsubscribe();
}
}
}
void CollectorInitiatedSubscription::logInvalidSubscriptionPropertyType(int line, DWORD type) {
logError(line, "Invalid property type: " + std::to_string(type));
}
bool CollectorInitiatedSubscription::checkSubscriptionRuntimeStatus() {
EC_HANDLE hSubscription = EcOpenSubscription(subscription_name_.c_str(), EC_READ_ACCESS, EC_OPEN_EXISTING);
if (!hSubscription) {
LOG_SUBSCRIPTION_WINDOWS_ERROR("EcOpenSubscription");
return false;
}
const auto guard_hSubscription = gsl::finally([hSubscription]() { EcClose(hSubscription); });
PEC_VARIANT vProperty = NULL;
std::vector<BYTE> buffer;
if (!getSubscriptionProperty(hSubscription, EcSubscriptionEventSources, 0, buffer, vProperty)) {
return false;
}
// Ensure that we have obtained handle to the Array Property.
if (vProperty->Type != EcVarTypeNull && vProperty->Type != EcVarObjectArrayPropertyHandle) {
logInvalidSubscriptionPropertyType(__LINE__, vProperty->Type);
return false;
}
if (vProperty->Type == EcVarTypeNull) {
LOG_SUBSCRIPTION_ERROR("!hArray");
return false;
}
const EC_OBJECT_ARRAY_PROPERTY_HANDLE hArray = vProperty->PropertyHandleVal;
const auto guard_hArray = gsl::finally([hArray]() { EcClose(hArray); });
// Get the EventSources array size (number of elements).
DWORD dwEventSourceCount{};
if (!EcGetObjectArraySize(hArray, &dwEventSourceCount)) {
LOG_SUBSCRIPTION_WINDOWS_ERROR("EcGetObjectArraySize");
return false;
}
auto getArrayProperty = [this](EC_OBJECT_ARRAY_PROPERTY_HANDLE hArray, EC_SUBSCRIPTION_PROPERTY_ID propID, DWORD arrayIndex, DWORD flags, std::vector<BYTE>& buffer, PEC_VARIANT& vProperty) -> bool {
buffer.clear();
buffer.resize(sizeof(EC_VARIANT));
DWORD dwBufferSizeUsed{};
if (!EcGetObjectArrayProperty(hArray, propID, arrayIndex, flags, static_cast<DWORD>(buffer.size()), reinterpret_cast<PEC_VARIANT>(&buffer[0]), &dwBufferSizeUsed)) {
if (ERROR_INSUFFICIENT_BUFFER == GetLastError()) {
buffer.resize(dwBufferSizeUsed);
if (!EcGetObjectArrayProperty(hArray, propID, arrayIndex, flags, static_cast<DWORD>(buffer.size()), reinterpret_cast<PEC_VARIANT>(&buffer[0]), &dwBufferSizeUsed)) {
LOG_SUBSCRIPTION_WINDOWS_ERROR("EcGetObjectArrayProperty");
return false;
}
} else {
LOG_SUBSCRIPTION_WINDOWS_ERROR("EcGetObjectArrayProperty");
return false;
}
}
vProperty = reinterpret_cast<PEC_VARIANT>(&buffer[0]);
return true;
};
auto getStatus = [this](const std::wstring& eventSource, EC_SUBSCRIPTION_RUNTIME_STATUS_INFO_ID statusInfoID, DWORD flags, std::vector<BYTE>& buffer, PEC_VARIANT& vStatus) -> bool {
buffer.clear();
buffer.resize(sizeof(EC_VARIANT));
DWORD dwBufferSize{};
if (!EcGetSubscriptionRunTimeStatus(
subscription_name_.c_str(),
statusInfoID,
eventSource.c_str(),
flags,
static_cast<DWORD>(buffer.size()),
reinterpret_cast<PEC_VARIANT>(&buffer[0]),
&dwBufferSize)) {
if (ERROR_INSUFFICIENT_BUFFER == GetLastError()) {
buffer.resize(dwBufferSize);
if (!EcGetSubscriptionRunTimeStatus(subscription_name_.c_str(),
statusInfoID,
eventSource.c_str(),
flags,
static_cast<DWORD>(buffer.size()),
reinterpret_cast<PEC_VARIANT>(&buffer[0]),
&dwBufferSize)) {
LOG_SUBSCRIPTION_WINDOWS_ERROR("EcGetSubscriptionRunTimeStatus");
}
} else {
LOG_SUBSCRIPTION_WINDOWS_ERROR("EcGetSubscriptionRunTimeStatus");
}
}
vStatus = reinterpret_cast<PEC_VARIANT>(&buffer[0]);
return true;
};
for (DWORD i = 0; i < dwEventSourceCount; i++) {
std::vector<BYTE> eventSourceBuffer;
PEC_VARIANT vProperty = NULL;
if (!getArrayProperty(hArray, EcSubscriptionEventSourceAddress, i, 0, eventSourceBuffer, vProperty)) {
return false;
}
if (vProperty->Type != EcVarTypeNull && vProperty->Type != EcVarTypeString) {
logInvalidSubscriptionPropertyType(__LINE__, vProperty->Type);
return false;
}
if (vProperty->Type == EcVarTypeNull)
continue;
const std::wstring eventSource = vProperty->StringVal;
if (!getStatus(eventSource.c_str(), EcSubscriptionRunTimeStatusActive, 0, buffer, vProperty)) {
return false;
}
if (vProperty->Type != EcVarTypeUInt32) {
logInvalidSubscriptionPropertyType(__LINE__, vProperty->Type);
return false;
}
const auto runtimeStatus = vProperty->UInt32Val;
std::wstring strRuntimeStatus;
switch (runtimeStatus) {
case EcRuntimeStatusActiveStatusActive:
strRuntimeStatus = L"Active";
break;
case EcRuntimeStatusActiveStatusDisabled:
strRuntimeStatus = L"Disabled";
break;
case EcRuntimeStatusActiveStatusInactive:
strRuntimeStatus = L"Inactive";
break;
case EcRuntimeStatusActiveStatusTrying:
strRuntimeStatus = L"Trying";
break;
default:
strRuntimeStatus = L"Unknown";
break;
}
// Get Subscription Last Error.
if (!getStatus(eventSource, EcSubscriptionRunTimeStatusLastError, 0, buffer, vProperty)) {
return false;
}
if (vProperty->Type != EcVarTypeUInt32) {
logInvalidSubscriptionPropertyType(__LINE__, vProperty->Type);
return false;
}
const auto lastError = vProperty->UInt32Val;
if (lastError == 0 && (runtimeStatus == EcRuntimeStatusActiveStatusActive || runtimeStatus == EcRuntimeStatusActiveStatusTrying)) {
logger_->log_info("Subscription '%ws': status '%ws', no error.", subscription_name_.c_str(), strRuntimeStatus.c_str());
return true;
}
// Obtain the associated Error Message.
if (!getStatus(eventSource, EcSubscriptionRunTimeStatusLastErrorMessage, 0, buffer, vProperty)) {
return false;
}
if (vProperty->Type != EcVarTypeNull && vProperty->Type != EcVarTypeString) {
logInvalidSubscriptionPropertyType(__LINE__, vProperty->Type);
return false;
}
std::wstring lastErrorMessage;
if (vProperty->Type == EcVarTypeString) {
lastErrorMessage = vProperty->StringVal;
}
logger_->log_error("Runtime status: %ws, last error: %d, last error message: %ws", strRuntimeStatus.c_str(), lastError, lastErrorMessage.c_str());
return false;
}
return true;
}
bool CollectorInitiatedSubscription::getSubscriptionProperty(EC_HANDLE hSubscription, EC_SUBSCRIPTION_PROPERTY_ID propID, DWORD flags, std::vector<BYTE>& buffer, PEC_VARIANT& vProperty) {
buffer.clear();
buffer.resize(sizeof(EC_VARIANT));
DWORD dwBufferSize{};
if (!EcGetSubscriptionProperty(hSubscription, propID, flags, static_cast<DWORD>(buffer.size()), reinterpret_cast<PEC_VARIANT>(&buffer[0]), &dwBufferSize)) {
if (ERROR_INSUFFICIENT_BUFFER == GetLastError()) {
buffer.resize(dwBufferSize);
if (!EcGetSubscriptionProperty(hSubscription, propID, flags, static_cast<DWORD>(buffer.size()), reinterpret_cast<PEC_VARIANT>(&buffer[0]), &dwBufferSize)) {
LOG_SUBSCRIPTION_WINDOWS_ERROR("EcGetSubscriptionProperty");
return false;
}
} else {
LOG_SUBSCRIPTION_WINDOWS_ERROR("EcGetSubscriptionProperty");
return false;
}
}
vProperty = reinterpret_cast<PEC_VARIANT>(&buffer[0]);
return true;
}
bool CollectorInitiatedSubscription::createSubscription(const std::shared_ptr<core::ProcessContext>& context) {
gsl_Expects(context);
// If subcription already exists, delete it.
EC_HANDLE hSubscription = EcOpenSubscription(subscription_name_.c_str(), EC_READ_ACCESS, EC_OPEN_EXISTING);
if (hSubscription) {
EcClose(hSubscription);
if (!EcDeleteSubscription(subscription_name_.c_str(), 0)) {
LOG_SUBSCRIPTION_WINDOWS_ERROR("EcDeleteSubscription");
return false;
}
}
// Create subscription.
hSubscription = EcOpenSubscription(subscription_name_.c_str(), EC_READ_ACCESS | EC_WRITE_ACCESS, EC_CREATE_NEW);
if (!hSubscription) {
LOG_SUBSCRIPTION_WINDOWS_ERROR("EcOpenSubscription");
return false;
}
const auto guard_hSubscription = gsl::finally([hSubscription]() { EcClose(hSubscription); });
struct SubscriptionProperty {
SubscriptionProperty(EC_SUBSCRIPTION_PROPERTY_ID propId, const std::wstring& val) {
propId_ = propId;
prop_.Type = EcVarTypeString;
prop_.StringVal = val.c_str();
}
SubscriptionProperty(EC_SUBSCRIPTION_PROPERTY_ID propId, uint32_t val) {
propId_ = propId;
prop_.Type = EcVarTypeUInt32;
prop_.UInt32Val = val;
}
SubscriptionProperty(EC_SUBSCRIPTION_PROPERTY_ID propId, bool val) {
propId_ = propId;
prop_.Type = EcVarTypeBoolean;
prop_.BooleanVal = val;
}
EC_SUBSCRIPTION_PROPERTY_ID propId_;
EC_VARIANT prop_;
};
const auto subscription_description = to_wstring(context->getProperty(SubscriptionDescription).value());
const auto source_channels = to_wstring(context->getProperty(SourceChannels).value());
const auto channel = to_wstring(context->getProperty(Channel).value());
const auto max_delivery_items = context->getProperty<core::DataSizeValue>(MaxDeliveryItems).value().getValue();
const auto delivery_max_latency_time = context->getProperty<core::TimePeriodValue>(DeliveryMaxLatencyTime).value().getMilliseconds().count();
const auto heartbeat_interval = context->getProperty<core::TimePeriodValue>(HeartbeatInterval).value().getMilliseconds().count();
const auto source_user_name = to_wstring(context->getProperty(SourceUserName).value());
const auto source_password = to_wstring(context->getProperty(SourcePassword).value());
std::vector<SubscriptionProperty> listProperty = {
{EcSubscriptionDescription, subscription_description},
{EcSubscriptionURI, std::wstring(L"http://schemas.microsoft.com/wbem/wsman/1/windows/EventLog")},
{EcSubscriptionQuery, L"<QueryList><Query Path=\"" + source_channels + L"\"><Select>*</Select></Query></QueryList>"},
{EcSubscriptionLogFile, channel},
{EcSubscriptionConfigurationMode, static_cast<uint32_t>(EcConfigurationModeCustom)},
{EcSubscriptionDeliveryMode, static_cast<uint32_t>(EcDeliveryModePull)},
{EcSubscriptionDeliveryMaxItems, static_cast<uint32_t>(max_delivery_items)},
{EcSubscriptionDeliveryMaxLatencyTime, static_cast<uint32_t>(delivery_max_latency_time)},
{EcSubscriptionHeartbeatInterval, static_cast<uint32_t>(heartbeat_interval)},
{EcSubscriptionContentFormat, static_cast<uint32_t>(EcContentFormatRenderedText)},
{EcSubscriptionCredentialsType, static_cast<uint32_t>(EcSubscriptionCredDefault)},
{EcSubscriptionEnabled, true},
{EcSubscriptionCommonUserName, source_user_name},
{EcSubscriptionCommonPassword, source_password}
};
for (auto& prop : listProperty) {
if (!EcSetSubscriptionProperty(hSubscription, prop.propId_, 0, &prop.prop_)) {
LOG_SUBSCRIPTION_WINDOWS_ERROR("EcSetSubscriptionProperty id: " + std::to_string(prop.propId_));
return false;
}
}
// Get the EventSources array so a new event source can be added for the specified target.
std::vector<BYTE> buffer;
PEC_VARIANT vProperty = NULL;
if (!getSubscriptionProperty(hSubscription, EcSubscriptionEventSources, 0, buffer, vProperty))
return false;
// Event Sources is a collection. Ensure that we have obtained handle to the Array Property.
if (vProperty->Type != EcVarTypeNull && vProperty->Type != EcVarObjectArrayPropertyHandle) {
logInvalidSubscriptionPropertyType(__LINE__, vProperty->Type);
return false;
}
if (vProperty->Type == EcVarTypeNull) {
LOG_SUBSCRIPTION_ERROR("!hArray");
return false;
}
const EC_OBJECT_ARRAY_PROPERTY_HANDLE hArray = vProperty->PropertyHandleVal;
const auto guard_hArray = gsl::finally([hArray]() { EcClose(hArray); });
DWORD dwEventSourceCount{};
if (!EcGetObjectArraySize(hArray, &dwEventSourceCount)) {
LOG_SUBSCRIPTION_WINDOWS_ERROR("EcGetObjectArraySize");
return false;
}
// Add a new EventSource to the EventSources array object.
if (!EcInsertObjectArrayElement(hArray, dwEventSourceCount)) {
LOG_SUBSCRIPTION_WINDOWS_ERROR("EcInsertObjectArrayElement");
return false;
}
const auto source_address = to_wstring(context->getProperty(SourceAddress).value());
for (auto& prop : std::vector<SubscriptionProperty>{{EcSubscriptionEventSourceAddress, source_address}, {EcSubscriptionEventSourceEnabled, true}}) {
if (!EcSetObjectArrayProperty(hArray, prop.propId_, dwEventSourceCount, 0, &prop.prop_)) {
LOG_SUBSCRIPTION_WINDOWS_ERROR("EcSetObjectArrayProperty id: " + std::to_string(prop.propId_));
return false;
}
}
if (!EcSaveSubscription(hSubscription, NULL)) {
LOG_SUBSCRIPTION_WINDOWS_ERROR("EcSaveSubscription");
return false;
}
return true;
}
bool CollectorInitiatedSubscription::subscribe(const std::shared_ptr<core::ProcessContext> &context) {
gsl_Expects(context);
logger_->log_debug("CollectorInitiatedSubscription: MaxBufferSize %lld", max_buffer_size_.getValue());
const auto channel = context->getProperty(Channel).value();
const auto query = context->getProperty(Query).value();
provenanceUri_ = "winlog://" + computerName_ + "/" + channel + "?" + query;
const auto channel_ws = to_wstring(context->getProperty(Channel).value());
const auto query_ws = to_wstring(context->getProperty(Query).value());
const EVT_SUBSCRIBE_CALLBACK callback = [](EVT_SUBSCRIBE_NOTIFY_ACTION action, PVOID pContext, EVT_HANDLE hEvent) {
auto pCollectorInitiatedSubscription = static_cast<CollectorInitiatedSubscription*>(pContext);
auto& logger = pCollectorInitiatedSubscription->logger_;
if (action == EvtSubscribeActionError) {
if (ERROR_EVT_QUERY_RESULT_STALE == reinterpret_cast<intptr_t>(hEvent)) {
logger->log_error("Received missing event notification. Consider triggering processor more frequently or increasing queue size.");
} else {
logger->log_error("Received the following Win32 error: %x", hEvent);
}
} else if (action == EvtSubscribeActionDeliver) {
DWORD size = 0;
DWORD used = 0;
DWORD propertyCount = 0;
if (!EvtRender(NULL, hEvent, EvtRenderEventXml, size, 0, &used, &propertyCount)) {
if (ERROR_INSUFFICIENT_BUFFER == GetLastError()) {
if (used > pCollectorInitiatedSubscription->max_buffer_size_.getValue()) {
logger->log_error("Dropping event %p because it couldn't be rendered within %llu bytes.", hEvent, pCollectorInitiatedSubscription->max_buffer_size_.getValue());
return 0UL;
}
size = used;
std::vector<wchar_t> buf(size/2);
if (EvtRender(NULL, hEvent, EvtRenderEventXml, size, &buf[0], &used, &propertyCount)) {
auto xml = to_string(&buf[0]);
pCollectorInitiatedSubscription->renderedXMLs_.enqueue(std::move(xml));
} else {
logger->log_error("EvtRender returned the following error code: %d.", GetLastError());
}
}
}
}
return 0UL;
};
subscriptionHandle_ = EvtSubscribe(
NULL,
NULL,
channel_ws.c_str(),
query_ws.c_str(),
NULL,
this,
callback,
EvtSubscribeToFutureEvents | EvtSubscribeStrict);
if (!subscriptionHandle_) {
logger_->log_error("Unable to subscribe with provided parameters, received the following error code: %d", GetLastError());
return false;
}
lastActivityTimestamp_ = GetTickCount64();
return true;
}
void CollectorInitiatedSubscription::unsubscribe() {
if (subscriptionHandle_) {
EvtClose(subscriptionHandle_);
subscriptionHandle_ = 0;
}
}
int CollectorInitiatedSubscription::processQueue(const std::shared_ptr<core::ProcessSession> &session) {
int flowFileCount = 0;
std::string xml;
while (renderedXMLs_.try_dequeue(xml)) {
auto flowFile = session->create();
session->writeBuffer(flowFile, xml);
session->putAttribute(flowFile, core::SpecialFlowAttribute::MIME_TYPE, "application/xml");
session->getProvenanceReporter()->receive(flowFile, provenanceUri_, getUUIDStr(), "Consume windows event logs", 0ms);
session->transfer(flowFile, Success);
flowFileCount++;
}
return flowFileCount;
}
void CollectorInitiatedSubscription::notifyStop() {
if (!EcDeleteSubscription(subscription_name_.c_str(), 0)) {
LOG_SUBSCRIPTION_WINDOWS_ERROR("EcDeleteSubscription");
}
unsubscribe();
if (renderedXMLs_.size_approx() != 0) {
auto session = sessionFactory_->createSession();
if (session) {
logger_->log_info("Finishing processing leftover events");
processQueue(session);
} else {
logger_->log_error(
"Stopping the processor but there is no ProcessSessionFactory stored and there are messages in the internal queue. "
"Removing the processor now will clear the queue but will result in DATA LOSS. This is normally due to starting the processor, "
"receiving events and stopping before the onTrigger happens. The messages in the internal queue cannot finish processing until "
"the processor is triggered to run.");
}
}
}
void CollectorInitiatedSubscription::logError(int line, const std::string& error) {
logger_->log_error("Line %d: %s\n", error.c_str());
}
void CollectorInitiatedSubscription::logWindowsError(int line, const std::string& info) {
auto error = GetLastError();
LPVOID lpMsg{};
FormatMessage(
FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM,
NULL,
error,
MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
(LPTSTR)&lpMsg,
0,
NULL);
logger_->log_error("Line %d: '%s': error %d: %s\n", line, info.c_str(), static_cast<int>(error), reinterpret_cast<char *>(lpMsg));
LocalFree(lpMsg);
}
REGISTER_RESOURCE(CollectorInitiatedSubscription, Processor);
} // namespace org::apache::nifi::minifi::processors