systemtest/apmservertest/filter.go (85 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package apmservertest
import (
"bytes"
"compress/zlib"
"context"
"encoding/json"
"io"
"go.elastic.co/apm/v2/model"
"go.elastic.co/apm/v2/transport"
"go.elastic.co/fastjson"
)
// TODO(axw) move EventMetadata and FilteringTransport to go.elastic.co/apmtest,
// generalising filteringTransport to work with arbitrary base transports. To do
// that we would need to dynamically check for optional interfaces supported by
// the base transport, and create passthrough methods.
// EventMetadata holds event metadata.
type EventMetadata struct {
System *model.System
Process *model.Process
Service *model.Service
Labels model.IfaceMap
}
// EventMetadata holds event metadata.
type EventMetadataFilter interface {
FilterEventMetadata(*EventMetadata)
}
// FilteringTransport is a transport for the APM Go agent which modifies events
// prior to sending them to the underlying transport.
type FilteringTransport struct {
*transport.HTTPTransport
filter EventMetadataFilter
}
// NewFilteringTransport returns a new FilteringTransport that filters events
// using f, and sends them on to h.
func NewFilteringTransport(h *transport.HTTPTransport, f EventMetadataFilter) *FilteringTransport {
return &FilteringTransport{h, f}
}
// SendStream decodes metadata from reader, passes it through the filters,
// and then sends the modified stream to the underlying transport.
func (t *FilteringTransport) SendStream(ctx context.Context, stream io.Reader) error {
zr, err := zlib.NewReader(stream)
if err != nil {
return err
}
decoder := json.NewDecoder(zr)
// The first object of any request must be a metadata struct.
var metadataPayload struct {
Metadata EventMetadata `json:"metadata"`
}
if err := decoder.Decode(&metadataPayload); err != nil {
return err
}
t.filter.FilterEventMetadata(&metadataPayload.Metadata)
// Re-encode metadata.
var json fastjson.Writer
json.RawString(`{"metadata":`)
json.RawString(`{"system":`)
metadataPayload.Metadata.System.MarshalFastJSON(&json)
json.RawString(`,"process":`)
metadataPayload.Metadata.Process.MarshalFastJSON(&json)
json.RawString(`,"service":`)
metadataPayload.Metadata.Service.MarshalFastJSON(&json)
if len(metadataPayload.Metadata.Labels) > 0 {
json.RawString(`,"labels":`)
metadataPayload.Metadata.Labels.MarshalFastJSON(&json)
}
json.RawString("}}\n")
// Copy everything to a new zlib-encoded stream and send.
var buf bytes.Buffer
zw := zlib.NewWriter(&buf)
zw.Write(json.Bytes())
if _, err := io.Copy(zw, io.MultiReader(decoder.Buffered(), zr)); err != nil {
return err
}
if err := zw.Close(); err != nil {
return err
}
return t.HTTPTransport.SendStream(ctx, &buf)
}
// DefaultMetadataFilter implements EventMetadataFilter, setting some default values
// for fields that would otherwise by dynamically discovered.
type DefaultMetadataFilter struct{}
// FilterEventMetadata updates m with default values for dynamically discovered fields.
func (DefaultMetadataFilter) FilterEventMetadata(m *EventMetadata) {
m.System.Platform = "minix"
m.System.Architecture = "i386"
m.System.Container = nil
m.System.Kubernetes = nil
m.System.Hostname = "beowulf"
m.Process.Argv = nil
m.Process.Pid = 1
m.Process.Ppid = nil
m.Process.Title = "systemtest.test"
m.Service.Agent.Version = "0.0.0"
m.Service.Language.Version = "2.0"
m.Service.Runtime.Version = "2.0"
m.Service.Node = nil
m.Service.Name = "systemtest"
}
// EventMetadataFilterFunc is a function type that implements EventMetadataFilter.
type EventMetadataFilterFunc func(*EventMetadata)
// FilterEventMetadata calls f(m).
func (f EventMetadataFilterFunc) FilterEventMetadata(m *EventMetadata) {
f(m)
}