reassembler.go (196 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package libaudit
import (
"errors"
"sort"
"sync"
"sync/atomic"
"time"
"github.com/elastic/go-libaudit/v2/auparse"
)
var errReassemblerClosed = errors.New("reassembler closed")
// Stream is implemented by the user of the Reassembler to handle reassembled
// audit data.
type Stream interface {
// ReassemblyComplete notifies that a complete group of events has been
// received and provides those events.
ReassemblyComplete(msgs []*auparse.AuditMessage)
// EventsLost notifies that some events were lost. This is based on gaps
// in the sequence numbers of received messages. Lost events can be caused
// by a slow receiver or because the kernel is configured to rate limit
// events.
EventsLost(count int)
}
// Type - Reassembler
// Reassembler combines related messages in to an event based on their timestamp
// and sequence number. It handles messages that may be have been received out
// of order or are interleaved. Reassembler is concurrency-safe.
//
// The Reassembler uses callbacks (see Stream interface) to notify the user of
// completed messages. Callbacks for reassembled events will occur in order of
// sequence number unless a late message is received that falls outside of the
// sequences held in memory.
type Reassembler struct {
closed int32 // closed is set to 1 when after the Reassembler is closed.
// cache contains the in-flight event messages. Eviction occurs when an
// event is completed via an EOE message, the cache reaches max size
// (lowest sequence is evicted first), or an event expires base on time.
list *eventList
// stream is the callback interface used for delivering completed events.
stream Stream
}
// NewReassembler returns a new Reassembler. maxInFlight controls the maximum
// number of events (based on timestamp + sequence) that are buffered. timeout
// controls how long the Reassembler waits for an EOE message (end-of-event)
// before evicting the event. And stream receives the callbacks for completed
// events and lost events.
func NewReassembler(maxInFlight int, timeout time.Duration, stream Stream) (*Reassembler, error) {
if stream == nil {
return nil, errors.New("stream cannot be nil")
}
return &Reassembler{
list: newEventList(maxInFlight, timeout),
stream: stream,
}, nil
}
// PushMessage pushes a new AuditMessage message into the Reassembler. Callbacks
// may be triggered as a result.
func (r *Reassembler) PushMessage(msg *auparse.AuditMessage) {
if msg == nil {
return
}
r.list.Put(msg)
evicted, lost := r.list.CleanUp()
r.callback(evicted, lost)
}
// Push pushes a new audit message into the Reassembler. This is a convenience
// function that handles calling auparse.Parse() to extract the message's
// timestamp and sequence number. It copies the rawData contents. If parsing
// fails then an error will be returned. See PushMessage.
func (r *Reassembler) Push(typ auparse.AuditMessageType, rawData []byte) error {
msg, err := auparse.Parse(typ, string(rawData))
if err != nil {
return err
}
r.PushMessage(msg)
return nil
}
// Maintain performs maintenance on the cached message. It can be called
// periodically to evict timed-out events. It returns a non-nil error if
// the Reassembler has been closed.
func (r *Reassembler) Maintain() error {
if atomic.LoadInt32(&r.closed) == 1 {
return errReassemblerClosed
}
evicted, lost := r.list.CleanUp()
r.callback(evicted, lost)
return nil
}
// Close flushes any cached events and closes the Reassembler.
func (r *Reassembler) Close() error {
if atomic.CompareAndSwapInt32(&r.closed, 0, 1) {
evicted, lost := r.list.Clear()
r.callback(evicted, lost)
return nil
}
return errReassemblerClosed
}
func (r *Reassembler) callback(events []*event, lost int) {
for _, e := range events {
r.stream.ReassemblyComplete(e.msgs)
}
if lost > 0 {
r.stream.EventsLost(lost)
}
}
type sequenceNum uint32
// Type - sequenceNumSlice
// maxSortRange defines the maximum range that sequence number can differ
// before being considered to have rolled over. When two values differ by more
// than this constant, the larger values is treated as being less.
const maxSortRange = 1<<24 - 1
type sequenceNumSlice []sequenceNum
func (p sequenceNumSlice) Len() int { return len(p) }
func (p sequenceNumSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func (p sequenceNumSlice) Sort() { sort.Sort(p) }
func (p sequenceNumSlice) Less(i, j int) bool {
// Handle sequence number rollover.
diff := abs(int64(p[i]) - int64(p[j]))
if diff > maxSortRange {
return p[i] > p[j]
}
return p[i] < p[j]
}
func abs(x int64) int64 {
if x < 0 {
return -x
}
return x
}
// Type - event
type event struct {
expireTime time.Time
msgs []*auparse.AuditMessage
complete bool
}
func (e *event) Add(msg *auparse.AuditMessage) {
e.msgs = append(e.msgs, msg)
// These messages all signal the completion of an event.
if msg.RecordType == auparse.AUDIT_PROCTITLE ||
msg.RecordType <= auparse.AUDIT_LAST_DAEMON ||
msg.RecordType >= auparse.AUDIT_ANOM_LOGIN_FAILURES {
e.complete = true
}
}
func (e *event) IsExpired() bool {
return time.Now().After(e.expireTime)
}
// Type - eventList
type eventList struct {
sync.Mutex
seqs sequenceNumSlice
events map[sequenceNum]*event
lastSeq sequenceNum
maxSize int
timeout time.Duration
}
func newEventList(maxSize int, timeout time.Duration) *eventList {
return &eventList{
seqs: make([]sequenceNum, 0, maxSize+1),
events: make(map[sequenceNum]*event, maxSize+1),
maxSize: maxSize,
timeout: timeout,
}
}
// remove the first event (lowest sequence) in the list.
func (l *eventList) remove() {
if len(l.seqs) > 0 {
seq := l.seqs[0]
l.seqs = l.seqs[1:]
delete(l.events, seq)
}
}
// Clear removes all events from the list and returns the events and the number
// of list events.
func (l *eventList) Clear() ([]*event, int) {
l.Lock()
defer l.Unlock()
var lost int
var seq sequenceNum
var evicted []*event
for {
size := len(l.seqs)
if size == 0 {
break
}
// Get event.
seq = l.seqs[0]
event := l.events[seq]
if l.lastSeq > 0 {
lost += int(seq - l.lastSeq - 1)
}
l.lastSeq = seq
evicted = append(evicted, event)
l.remove()
}
return evicted, lost
}
// Put a new message in the list.
func (l *eventList) Put(msg *auparse.AuditMessage) {
l.Lock()
defer l.Unlock()
seq := sequenceNum(msg.Sequence)
e, found := l.events[seq]
// Mark as complete, but do not append.
if msg.RecordType == auparse.AUDIT_EOE {
if found {
e.complete = true
}
return
}
if !found {
l.seqs = append(l.seqs, seq)
l.seqs.Sort()
e = &event{
expireTime: time.Now().Add(l.timeout),
msgs: make([]*auparse.AuditMessage, 0, 4),
}
l.events[seq] = e
}
e.Add(msg)
}
func (l *eventList) CleanUp() ([]*event, int) {
l.Lock()
defer l.Unlock()
var lost int
var seq sequenceNum
var evicted []*event
for {
size := len(l.seqs)
if size == 0 {
break
}
// Get event.
seq = l.seqs[0]
event := l.events[seq]
if event.complete || size > l.maxSize || event.IsExpired() {
if l.lastSeq > 0 {
lost += int(seq - l.lastSeq - 1)
}
l.lastSeq = seq
evicted = append(evicted, event)
l.remove()
continue
}
break
}
return evicted, lost
}