x-pack/libbeat/common/cloudfoundry/events.go (495 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
package cloudfoundry
import (
"encoding/binary"
"fmt"
"net/url"
"strings"
"time"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/cloudfoundry/sonde-go/events"
)
// EventType defines the different event types that can be raised from RPLClient.
type EventType uint
// EventTypes from loggregator documented here: https://github.com/cloudfoundry/loggregator-api
const (
// EventTypeHttpAccess is a http access event.
EventTypeHttpAccess EventType = iota
// EventTypeLog is a log event.
EventTypeLog
// EventTypeCounter is a counter event.
EventTypeCounter
// EventTypeValueMetric is a value metric event.
EventTypeValueMetric
// EventTypeContainerMetric is a container metric event.
EventTypeContainerMetric
// EventTypeError is an error event.
EventTypeError
)
// String returns string representation of the event type.
func (t EventType) String() string {
switch t {
case EventTypeHttpAccess:
return "access"
case EventTypeLog:
return "log"
case EventTypeCounter:
return "counter"
case EventTypeValueMetric:
return "value"
case EventTypeContainerMetric:
return "container"
case EventTypeError:
return "error"
default:
return "unknown"
}
}
// EventMessageType defines the different log message types.
type EventLogMessageType uint
const (
// EventLogMessageTypeStdout is a message that was received from stdout.
EventLogMessageTypeStdout EventLogMessageType = iota + 1
// EventLogMessageTypeStderr is a message that was received from stderr.
EventLogMessageTypeStderr
)
// String returns string representation of the event log message type.
func (t EventLogMessageType) String() string {
switch t {
case EventLogMessageTypeStdout:
return "stdout"
case EventLogMessageTypeStderr:
return "stderr"
default:
return "unknown"
}
}
// Event is the interface all events implements.
type Event interface {
fmt.Stringer
Origin() string
EventType() EventType
Timestamp() time.Time
Deployment() string
Job() string
Index() string
IP() string
Tags() map[string]string
ToFields() mapstr.M
}
// EventWithAppID is the interface all events implement that provide an application ID for the event.
type EventWithAppID interface {
Event
AppGuid() string
}
type eventBase struct {
origin string
timestamp time.Time
deployment string
job string
index string
ip string
tags map[string]string
}
type eventAppBase struct {
eventBase
appGuid string
}
// EventHttpAccess represents a http access event.
type EventHttpAccess struct {
eventAppBase
startTimestamp time.Time
stopTimestamp time.Time
requestID string
peerType string
method string
uri string
remoteAddress string
userAgent string
statusCode int32
contentLength int64
instanceIndex int32
forwarded []string
}
func (*EventHttpAccess) EventType() EventType { return EventTypeHttpAccess }
func (e *EventHttpAccess) String() string { return e.EventType().String() }
func (e *EventHttpAccess) Origin() string { return e.origin }
func (e *EventHttpAccess) Timestamp() time.Time { return e.timestamp }
func (e *EventHttpAccess) Deployment() string { return e.deployment }
func (e *EventHttpAccess) Job() string { return e.job }
func (e *EventHttpAccess) Index() string { return e.index }
func (e *EventHttpAccess) IP() string { return e.ip }
func (e *EventHttpAccess) Tags() map[string]string { return e.tags }
func (e *EventHttpAccess) AppGuid() string { return e.appGuid }
func (e *EventHttpAccess) StartTimestamp() time.Time { return e.startTimestamp }
func (e *EventHttpAccess) StopTimestamp() time.Time { return e.stopTimestamp }
func (e *EventHttpAccess) RequestID() string { return e.requestID }
func (e *EventHttpAccess) PeerType() string { return e.peerType }
func (e *EventHttpAccess) Method() string { return e.method }
func (e *EventHttpAccess) URI() string { return e.uri }
func (e *EventHttpAccess) RemoteAddress() string { return e.remoteAddress }
func (e *EventHttpAccess) UserAgent() string { return e.userAgent }
func (e *EventHttpAccess) StatusCode() int32 { return e.statusCode }
func (e *EventHttpAccess) ContentLength() int64 { return e.contentLength }
func (e *EventHttpAccess) InstanceIndex() int32 { return e.instanceIndex }
func (e *EventHttpAccess) Forwarded() []string { return e.forwarded }
func (e *EventHttpAccess) ToFields() mapstr.M {
fields := baseMapWithApp(e)
fields.DeepUpdate(mapstr.M{
"http": mapstr.M{
"response": mapstr.M{
"status_code": e.StatusCode(),
"method": e.Method(),
"bytes": e.ContentLength(),
},
},
"user_agent": mapstr.M{
"original": e.UserAgent(),
},
"url": urlMap(e.URI()),
})
return fields
}
// EventLog represents a log message event.
type EventLog struct {
eventAppBase
message string
messageType EventLogMessageType
sourceType string
sourceID string
}
func (*EventLog) EventType() EventType { return EventTypeLog }
func (e *EventLog) String() string { return e.EventType().String() }
func (e *EventLog) Origin() string { return e.origin }
func (e *EventLog) Timestamp() time.Time { return e.timestamp }
func (e *EventLog) Deployment() string { return e.deployment }
func (e *EventLog) Job() string { return e.job }
func (e *EventLog) Index() string { return e.index }
func (e *EventLog) IP() string { return e.ip }
func (e *EventLog) Tags() map[string]string { return e.tags }
func (e *EventLog) AppGuid() string { return e.appGuid }
func (e *EventLog) Message() string { return e.message }
func (e *EventLog) MessageType() EventLogMessageType { return e.messageType }
func (e *EventLog) SourceType() string { return e.sourceType }
func (e *EventLog) SourceID() string { return e.sourceID }
func (e *EventLog) ToFields() mapstr.M {
fields := baseMapWithApp(e)
fields.DeepUpdate(mapstr.M{
"cloudfoundry": mapstr.M{
e.String(): mapstr.M{
"source": mapstr.M{
"instance": e.SourceID(),
"type": e.SourceType(),
},
},
},
"message": e.Message(),
"stream": e.MessageType().String(),
})
return fields
}
// EventCounter represents a counter event.
type EventCounter struct {
eventBase
name string
delta uint64
total uint64
}
func (*EventCounter) EventType() EventType { return EventTypeCounter }
func (e *EventCounter) String() string { return e.EventType().String() }
func (e *EventCounter) Origin() string { return e.origin }
func (e *EventCounter) Timestamp() time.Time { return e.timestamp }
func (e *EventCounter) Deployment() string { return e.deployment }
func (e *EventCounter) Job() string { return e.job }
func (e *EventCounter) Index() string { return e.index }
func (e *EventCounter) IP() string { return e.ip }
func (e *EventCounter) Tags() map[string]string { return e.tags }
func (e *EventCounter) Name() string { return e.name }
func (e *EventCounter) Delta() uint64 { return e.delta }
func (e *EventCounter) Total() uint64 { return e.total }
func (e *EventCounter) ToFields() mapstr.M {
fields := baseMap(e)
fields.DeepUpdate(mapstr.M{
"cloudfoundry": mapstr.M{
e.String(): mapstr.M{
"name": e.Name(),
"delta": e.Delta(),
"total": e.Total(),
},
},
})
return fields
}
// EventValueMetric represents a value metric event.
type EventValueMetric struct {
eventBase
name string
value float64
unit string
}
func (*EventValueMetric) EventType() EventType { return EventTypeValueMetric }
func (e *EventValueMetric) String() string { return e.EventType().String() }
func (e *EventValueMetric) Origin() string { return e.origin }
func (e *EventValueMetric) Timestamp() time.Time { return e.timestamp }
func (e *EventValueMetric) Deployment() string { return e.deployment }
func (e *EventValueMetric) Job() string { return e.job }
func (e *EventValueMetric) Index() string { return e.index }
func (e *EventValueMetric) IP() string { return e.ip }
func (e *EventValueMetric) Tags() map[string]string { return e.tags }
func (e *EventValueMetric) Name() string { return e.name }
func (e *EventValueMetric) Value() float64 { return e.value }
func (e *EventValueMetric) Unit() string { return e.unit }
func (e *EventValueMetric) ToFields() mapstr.M {
fields := baseMap(e)
fields.DeepUpdate(mapstr.M{
"cloudfoundry": mapstr.M{
e.String(): mapstr.M{
"name": e.Name(),
"unit": e.Unit(),
"value": e.Value(),
},
},
})
return fields
}
// EventContainerMetric represents a container metric event.
type EventContainerMetric struct {
eventAppBase
instanceIndex int32
cpuPercentage float64
memoryBytes uint64
diskBytes uint64
memoryBytesQuota uint64
diskBytesQuota uint64
}
func (*EventContainerMetric) EventType() EventType { return EventTypeContainerMetric }
func (e *EventContainerMetric) String() string { return e.EventType().String() }
func (e *EventContainerMetric) Origin() string { return e.origin }
func (e *EventContainerMetric) Timestamp() time.Time { return e.timestamp }
func (e *EventContainerMetric) Deployment() string { return e.deployment }
func (e *EventContainerMetric) Job() string { return e.job }
func (e *EventContainerMetric) Index() string { return e.index }
func (e *EventContainerMetric) IP() string { return e.ip }
func (e *EventContainerMetric) Tags() map[string]string { return e.tags }
func (e *EventContainerMetric) AppGuid() string { return e.appGuid }
func (e *EventContainerMetric) InstanceIndex() int32 { return e.instanceIndex }
func (e *EventContainerMetric) CPUPercentage() float64 { return e.cpuPercentage }
func (e *EventContainerMetric) MemoryBytes() uint64 { return e.memoryBytes }
func (e *EventContainerMetric) DiskBytes() uint64 { return e.diskBytes }
func (e *EventContainerMetric) MemoryBytesQuota() uint64 { return e.memoryBytesQuota }
func (e *EventContainerMetric) DiskBytesQuota() uint64 { return e.diskBytesQuota }
func (e *EventContainerMetric) ToFields() mapstr.M {
fields := baseMapWithApp(e)
fields.DeepUpdate(mapstr.M{
"cloudfoundry": mapstr.M{
e.String(): mapstr.M{
"instance_index": e.InstanceIndex(),
"cpu.pct": e.CPUPercentage(),
"memory.bytes": e.MemoryBytes(),
"memory.quota.bytes": e.MemoryBytesQuota(),
"disk.bytes": e.DiskBytes(),
"disk.quota.bytes": e.DiskBytesQuota(),
},
},
})
return fields
}
// EventError represents an error event.
type EventError struct {
eventBase
message string
code int32
source string
}
func (*EventError) EventType() EventType { return EventTypeError }
func (e *EventError) String() string { return e.EventType().String() }
func (e *EventError) Origin() string { return e.origin }
func (e *EventError) Timestamp() time.Time { return e.timestamp }
func (e *EventError) Deployment() string { return e.deployment }
func (e *EventError) Job() string { return e.job }
func (e *EventError) Index() string { return e.index }
func (e *EventError) IP() string { return e.ip }
func (e *EventError) Tags() map[string]string { return e.tags }
func (e *EventError) Message() string { return e.message }
func (e *EventError) Code() int32 { return e.code }
func (e *EventError) Source() string { return e.source }
func (e *EventError) ToFields() mapstr.M {
fields := baseMap(e)
fields.DeepUpdate(mapstr.M{
"cloudfoundry": mapstr.M{
e.String(): mapstr.M{
"source": e.Source(),
},
},
"message": e.Message(),
"code": e.Code(),
})
return fields
}
func newEventBase(env *events.Envelope) eventBase {
return eventBase{
origin: *env.Origin,
timestamp: time.Unix(0, *env.Timestamp),
deployment: *env.Deployment,
job: *env.Job,
index: *env.Index,
ip: *env.Ip,
tags: env.Tags,
}
}
func newEventHttpAccess(env *events.Envelope) *EventHttpAccess {
msg := env.GetHttpStartStop()
e := EventHttpAccess{
eventAppBase: eventAppBase{
eventBase: newEventBase(env),
appGuid: formatUUID(msg.ApplicationId),
},
startTimestamp: time.Unix(0, *msg.StartTimestamp),
stopTimestamp: time.Unix(0, *msg.StopTimestamp),
requestID: formatUUID(msg.RequestId),
peerType: strings.ToLower(msg.PeerType.String()),
method: msg.Method.String(),
uri: *msg.Uri,
remoteAddress: *msg.RemoteAddress,
userAgent: *msg.UserAgent,
statusCode: *msg.StatusCode,
contentLength: *msg.ContentLength,
forwarded: msg.Forwarded,
}
if msg.InstanceIndex != nil {
e.instanceIndex = *msg.InstanceIndex
}
return &e
}
func newEventLog(env *events.Envelope) *EventLog {
msg := env.GetLogMessage()
return &EventLog{
eventAppBase: eventAppBase{
eventBase: newEventBase(env),
appGuid: *msg.AppId,
},
message: string(msg.Message),
messageType: EventLogMessageType(*msg.MessageType),
sourceType: *msg.SourceType,
sourceID: *msg.SourceInstance,
}
}
func newEventCounter(env *events.Envelope) *EventCounter {
msg := env.GetCounterEvent()
return &EventCounter{
eventBase: newEventBase(env),
name: *msg.Name,
delta: *msg.Delta,
total: *msg.Total,
}
}
func newEventValueMetric(env *events.Envelope) *EventValueMetric {
msg := env.GetValueMetric()
return &EventValueMetric{
eventBase: newEventBase(env),
name: *msg.Name,
value: *msg.Value,
unit: *msg.Unit,
}
}
func newEventContainerMetric(env *events.Envelope) *EventContainerMetric {
msg := env.GetContainerMetric()
return &EventContainerMetric{
eventAppBase: eventAppBase{
eventBase: newEventBase(env),
appGuid: *msg.ApplicationId,
},
instanceIndex: *msg.InstanceIndex,
cpuPercentage: *msg.CpuPercentage,
memoryBytes: *msg.MemoryBytes,
diskBytes: *msg.DiskBytes,
memoryBytesQuota: *msg.MemoryBytesQuota,
diskBytesQuota: *msg.DiskBytesQuota,
}
}
func newEventError(env *events.Envelope) *EventError {
msg := env.GetError()
return &EventError{
eventBase: newEventBase(env),
message: *msg.Message,
code: *msg.Code,
source: *msg.Source,
}
}
func EnvelopeToEvent(env *events.Envelope) Event {
switch *env.EventType {
case events.Envelope_HttpStartStop:
return newEventHttpAccess(env)
case events.Envelope_LogMessage:
return newEventLog(env)
case events.Envelope_CounterEvent:
return newEventCounter(env)
case events.Envelope_ValueMetric:
return newEventValueMetric(env)
case events.Envelope_ContainerMetric:
return newEventContainerMetric(env)
case events.Envelope_Error:
return newEventError(env)
}
return nil
}
func envelopMap(evt Event) mapstr.M {
return mapstr.M{
"origin": evt.Origin(),
"deployment": evt.Deployment(),
"ip": evt.IP(),
"job": evt.Job(),
"index": evt.Index(),
}
}
func baseMap(evt Event) mapstr.M {
tags, meta := tagsToMeta(evt.Tags())
cf := mapstr.M{
"type": evt.String(),
"envelope": envelopMap(evt),
}
if len(tags) > 0 {
cf["tags"] = tags
}
result := mapstr.M{
"cloudfoundry": cf,
}
if len(meta) > 0 {
result.DeepUpdate(meta)
}
return result
}
func tagsToMeta(eventTags map[string]string) (tags mapstr.M, meta mapstr.M) {
tags = mapstr.M{}
meta = mapstr.M{}
for name, value := range eventTags {
switch name {
case "app_id":
meta.Put("cloudfoundry.app.id", value)
case "app_name":
meta.Put("cloudfoundry.app.name", value)
case "space_id":
meta.Put("cloudfoundry.space.id", value)
case "space_name":
meta.Put("cloudfoundry.space.name", value)
case "organization_id":
meta.Put("cloudfoundry.org.id", value)
case "organization_name":
meta.Put("cloudfoundry.org.name", value)
default:
tags[common.DeDot(name)] = value
}
}
return tags, meta
}
func baseMapWithApp(evt EventWithAppID) mapstr.M {
base := baseMap(evt)
appID := evt.AppGuid()
if appID != "" {
base.Put("cloudfoundry.app.id", appID)
}
return base
}
func urlMap(uri string) mapstr.M {
u, err := url.Parse(uri)
if err != nil {
return mapstr.M{
"original": uri,
}
}
return mapstr.M{
"original": uri,
"scheme": u.Scheme,
"port": u.Port(),
"path": u.Path,
"domain": u.Hostname(),
}
}
func formatUUID(uuid *events.UUID) string {
if uuid == nil {
return ""
}
var uuidBytes [16]byte
binary.LittleEndian.PutUint64(uuidBytes[:8], uuid.GetLow())
binary.LittleEndian.PutUint64(uuidBytes[8:], uuid.GetHigh())
return fmt.Sprintf("%x-%x-%x-%x-%x", uuidBytes[0:4], uuidBytes[4:6], uuidBytes[6:8], uuidBytes[8:10], uuidBytes[10:])
}