plugins/inputs/windows_event_log/wineventlog/wineventlog.go (365 lines of code) (raw):
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT
//go:build windows
// +build windows
package wineventlog
import (
"encoding/xml"
"fmt"
"log"
"os"
"strconv"
"strings"
"sync"
"syscall"
"time"
"golang.org/x/sys/windows"
"github.com/aws/amazon-cloudwatch-agent/logs"
"github.com/aws/amazon-cloudwatch-agent/sdk/service/cloudwatchlogs"
)
// https://msdn.microsoft.com/en-us/library/windows/desktop/aa385588(v=vs.85).aspx
// https://msdn.microsoft.com/en-us/library/windows/desktop/ms681382(v=vs.85).aspx
// https://msdn.microsoft.com/en-us/library/windows/desktop/aa385525(v=vs.85).aspx
const (
RPC_S_INVALID_BOUND syscall.Errno = 1734
collectionInterval = time.Second
saveStateInterval = 100 * time.Millisecond
subscribeMaxRetries = 3
apiEvtSubscribe = "EvtSubscribe"
apiEvtClose = "EvtClose"
)
type wevtAPIError struct {
api string
name string
err error
}
func (e *wevtAPIError) Error() string {
return fmt.Sprintf("%s(), name %s, err %v", e.api, e.name, e.err)
}
type windowsEventLog struct {
name string
levels []string
logGroupName string
logStreamName string
logGroupClass string
renderFormat string
maxToRead int // Maximum number returned in one read.
destination string
stateFilePath string
eventHandle EvtHandle
eventOffset uint64
retention int
outputFn func(logs.LogEvent)
offsetCh chan uint64
done chan struct{}
startOnce sync.Once
resubscribeCh chan struct{}
}
func NewEventLog(name string, levels []string, logGroupName, logStreamName, renderFormat, destination, stateFilePath string, maximumToRead int, retention int, logGroupClass string) *windowsEventLog {
eventLog := &windowsEventLog{
name: name,
levels: levels,
logGroupName: logGroupName,
logStreamName: logStreamName,
logGroupClass: logGroupClass,
renderFormat: renderFormat,
maxToRead: maximumToRead,
destination: destination,
stateFilePath: stateFilePath,
retention: retention,
offsetCh: make(chan uint64, 100),
done: make(chan struct{}),
resubscribeCh: make(chan struct{}),
}
return eventLog
}
func (w *windowsEventLog) Init() error {
go w.runSaveState()
w.eventOffset = w.loadState()
return w.Open()
}
func (w *windowsEventLog) SetOutput(fn func(logs.LogEvent)) {
if fn == nil {
return
}
w.outputFn = fn
w.startOnce.Do(func() { go w.run() })
}
func (w *windowsEventLog) Group() string {
return w.logGroupName
}
func (w *windowsEventLog) Stream() string {
return w.logStreamName
}
func (w *windowsEventLog) Description() string {
return fmt.Sprintf("%v%v", w.name, w.levels)
}
func (w *windowsEventLog) Destination() string {
return w.destination
}
func (w *windowsEventLog) Retention() int {
return w.retention
}
func (w *windowsEventLog) Class() string {
return w.logGroupClass
}
func (w *windowsEventLog) Stop() {
close(w.done)
}
func (w *windowsEventLog) Entity() *cloudwatchlogs.Entity {
return nil
}
func (w *windowsEventLog) run() {
ticker := time.NewTicker(collectionInterval)
defer ticker.Stop()
retryCount := 0
var shouldResubscribe bool
for {
select {
case <-w.resubscribeCh:
shouldResubscribe = true
case <-ticker.C:
if shouldResubscribe {
w.eventOffset = w.loadState()
if err := w.resubscribe(); err != nil {
log.Printf("E! [wineventlog] Unable to re-subscribe: %v", err)
retryCount++
if retryCount >= subscribeMaxRetries {
log.Printf("D! [wineventlog] Max subscribe retries reached: %d", subscribeMaxRetries)
shouldResubscribe = false
retryCount = 0
}
} else {
log.Printf("D! [wineventlog] Re-subscribed to %s", w.name)
shouldResubscribe = false
}
}
records := w.read()
for _, record := range records {
value, err := record.Value()
if err != nil {
log.Printf("E! [wineventlog] Error happened when collecting windows events: %v", err)
continue
}
recordNumber, _ := strconv.ParseUint(record.System.EventRecordID, 10, 64)
evt := &LogEvent{
msg: value,
t: record.System.TimeCreated.SystemTime,
offset: recordNumber,
src: w,
}
w.outputFn(evt)
}
case <-w.done:
return
}
}
}
// Open subscription for events. Instead of failing the subscription if the eventlog name has not been registered,
// log the error.
func (w *windowsEventLog) Open() error {
err := w.open()
if werr, ok := err.(*wevtAPIError); ok && werr.api == apiEvtSubscribe {
log.Printf("W! [wineventlog] %v", err)
return nil
}
return err
}
func (w *windowsEventLog) open() error {
bookmark, err := CreateBookmark(w.name, w.eventOffset)
if err != nil {
return err
}
defer EvtClose(bookmark)
// Using a pull subscription to receive events. See:
// https://msdn.microsoft.com/en-us/library/windows/desktop/aa385771(v=vs.85).aspx#pull
signalEvent, err := windows.CreateEvent(nil, 0, 0, nil)
if err != nil {
return nil
}
channelPath, err := syscall.UTF16PtrFromString(w.name)
if err != nil {
return err
}
query, err := CreateQuery(w.name, w.levels)
if err != nil {
return err
}
eventHandle, err := EvtSubscribe(0, uintptr(signalEvent), channelPath, query, bookmark, 0, 0, EvtSubscribeStartAfterBookmark)
if err != nil {
return &wevtAPIError{api: apiEvtSubscribe, name: w.name, err: err}
}
w.eventHandle = eventHandle
return nil
}
func (w *windowsEventLog) Close() error {
return EvtClose(w.eventHandle)
}
// resubscribe closes the event subscription based on the event handle and resets the handle to the
// same state as an EvtSubscribe failure (0) before attempting to open another event subscription.
func (w *windowsEventLog) resubscribe() error {
if w.eventHandle != 0 {
if err := w.Close(); err != nil {
return &wevtAPIError{api: apiEvtClose, name: w.name, err: err}
}
}
w.eventHandle = EvtHandle(0)
return w.open()
}
func (w *windowsEventLog) LogGroupName() string {
return w.logGroupName
}
func (w *windowsEventLog) LogStreamName() string {
return w.logStreamName
}
func (w *windowsEventLog) EventOffset() uint64 {
return w.eventOffset
}
func (w *windowsEventLog) SetEventOffset(eventOffset uint64) {
w.eventOffset = eventOffset
}
func (w *windowsEventLog) Done(offset uint64) {
w.offsetCh <- offset
}
func (w *windowsEventLog) ResubscribeCh() chan struct{} {
return w.resubscribeCh
}
func (w *windowsEventLog) runSaveState() {
t := time.NewTicker(saveStateInterval)
defer w.Stop()
var offset, lastSavedOffset uint64
for {
select {
case o := <-w.offsetCh:
if o > offset {
offset = o
}
case <-t.C:
if offset == lastSavedOffset {
continue
}
err := w.saveState(offset)
if err != nil {
log.Printf("E! [wineventlog] Error happened when saving file state %s to file state folder %s: %v", w.logGroupName, w.stateFilePath, err)
continue
}
lastSavedOffset = offset
case <-w.done:
err := w.saveState(offset)
if err != nil {
log.Printf("E! [wineventlog] Error happened during final file state saving of logfile %s to file state folder %s, duplicate log maybe sent at next start: %v", w.logGroupName, w.stateFilePath, err)
}
break
}
}
}
func (w *windowsEventLog) saveState(offset uint64) error {
if w.stateFilePath == "" || offset == 0 {
return nil
}
content := []byte(strconv.FormatUint(offset, 10) + "\n" + w.logGroupName)
return os.WriteFile(w.stateFilePath, content, 0644)
}
func (w *windowsEventLog) read() []*windowsEventLogRecord {
maxToRead := w.maxToRead
var eventHandles []EvtHandle
defer func() {
for _, h := range eventHandles {
EvtClose(h)
}
}()
var numRead uint32
for {
eventHandles = make([]EvtHandle, maxToRead)
err := EvtNext(w.eventHandle, uint32(len(eventHandles)),
&eventHandles[0], 0, 0, &numRead)
// Handle special case when events size is too large - retry with smaller size
if err == RPC_S_INVALID_BOUND {
if maxToRead == 1 {
log.Printf("E! [wineventlog] Out of bounds error due to large events size. Will skip the event as we cannot process it. Details: %v\n", err)
return nil
}
log.Printf("W! [wineventlog] Out of bounds error due to large events size. Retrying with half of the read batch size (%d). Details: %v\n", maxToRead/2, err)
maxToRead /= 2
for _, h := range eventHandles {
EvtClose(h)
}
continue
}
break
}
// Decode the events into objects
return w.getRecords(eventHandles[:numRead])
}
type LogEvent struct {
msg string
t time.Time
offset uint64
src *windowsEventLog
}
func (le LogEvent) Message() string {
return le.msg
}
func (le LogEvent) Time() time.Time {
return le.t
}
func (le LogEvent) Done() {
le.src.Done(le.offset)
}
// getRecords attempts to render and format each of the given EvtHandles.
// If one handle has an error, continue on because something is better than nothing.
func (w *windowsEventLog) getRecords(handles []EvtHandle) (records []*windowsEventLogRecord) {
for _, evtHandle := range handles {
r, err := w.getRecord(evtHandle)
if err == nil {
records = append(records, r)
} else {
log.Printf("I! [wineventlog] %v", err)
}
}
return records
}
// getRecord attemps to render and format the message for the given EvtHandle.
func (w *windowsEventLog) getRecord(evtHandle EvtHandle) (*windowsEventLogRecord, error) {
// Notes on the process:
// - We first call RenderEventXML to get the publisher details. This piece of information is then used
// for rendering the event and getting a readable XML format that contains the log message.
// - We can later do more research on comparing other methods to get the publisher details such as EvtCreateRenderContext
// Windows event message supports 31839 characters. https://msdn.microsoft.com/EN-US/library/windows/desktop/aa363679.aspx
bufferSize := 1 << 17
renderBuf := make([]byte, bufferSize)
outputBuf, err := RenderEventXML(evtHandle, renderBuf)
if err != nil {
return nil, fmt.Errorf("RenderEventXML() err %v", err)
}
newRecord := newEventLogRecord(w)
//we need the "System.TimeCreated.SystemTime"
xml.Unmarshal(outputBuf, newRecord)
publisher, _ := syscall.UTF16PtrFromString(newRecord.System.Provider.Name)
publisherMetadataEvtHandle, err := EvtOpenPublisherMetadata(0, publisher, nil, 0, 0)
if err != nil {
return nil, fmt.Errorf("EvtOpenPublisherMetadata() publisher %v, err %v", newRecord.System.Provider.Name, err)
}
var bufferUsed uint32
err = EvtFormatMessage(publisherMetadataEvtHandle, evtHandle, 0, 0, 0, EvtFormatMessageXml, uint32(bufferSize), &renderBuf[0], &bufferUsed)
EvtClose(publisherMetadataEvtHandle)
if err != nil && bufferUsed == 0 {
return nil, fmt.Errorf("EvtFormatMessage() publisher %v, err %v", newRecord.System.Provider.Name, err)
}
descriptionBytes, err := UTF16ToUTF8BytesForWindowsEventBuffer(renderBuf, bufferUsed)
if err != nil {
return nil, fmt.Errorf("utf16ToUTF8Bytes() err %v", err)
}
// The insertion strings could be in either EventData or UserData
// Notes on the insertion strings:
// - The EvtFormatMessage has the valueCount and values parameters, yet it does not work when we tried passing
// EventData/UserData into those parameters. We can later do more research on making EvtFormatMessage with
// valueCount and values parameters works and compare if there is any benefit.
dataValues := newRecord.EventData.Data
// The UserData section is used if EventData is empty
if len(dataValues) == 0 {
dataValues = newRecord.UserData.Data
}
switch w.renderFormat {
case FormatXml, FormatDefault:
//XML format
newRecord.XmlFormatContent = insertPlaceholderValues(string(descriptionBytes), dataValues)
case FormatPlainText:
//old SSM agent Windows format
var recordMessage eventMessage
err = xml.Unmarshal(descriptionBytes, &recordMessage)
if err != nil {
return nil, fmt.Errorf("Unmarshal() err %v", err)
}
newRecord.System.Description = insertPlaceholderValues(recordMessage.Message, dataValues)
default:
return nil, fmt.Errorf("renderFormat is not recognized, %s", w.renderFormat)
}
return newRecord, nil
}
func (w *windowsEventLog) loadState() uint64 {
if _, err := os.Stat(w.stateFilePath); err != nil {
log.Printf("I! [wineventlog] The state file for %s does not exist: %v", w.stateFilePath, err)
return 0
}
byteArray, err := os.ReadFile(w.stateFilePath)
if err != nil {
log.Printf("W! [wineventlog] Issue encountered when reading offset from file %s: %v", w.stateFilePath, err)
return 0
}
offset, err := strconv.ParseUint(strings.Split(string(byteArray), "\n")[0], 10, 64)
if err != nil {
log.Printf("W! [wineventlog] Issue encountered when parsing offset value %v: %v", byteArray, err)
return 0
}
log.Printf("D! [wineventlog] Reading from offset %v in %s", offset, w.stateFilePath)
return offset
}