transport/transporttest/recorder.go (154 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package transporttest // import "go.elastic.co/apm/v2/transport/transporttest" import ( "compress/zlib" "context" "encoding/json" "fmt" "io" "io/ioutil" "sync" "github.com/google/go-cmp/cmp" "go.elastic.co/apm/v2" "go.elastic.co/apm/v2/model" ) // NewRecorderTracer returns a new apm.Tracer and // RecorderTransport, which is set as the tracer's transport. // // DEPRECATED. Use apmtest.NewRecordingTracer instead. func NewRecorderTracer() (*apm.Tracer, *RecorderTransport) { var transport RecorderTransport tracer, err := apm.NewTracerOptions(apm.TracerOptions{ ServiceName: "transporttest", Transport: &transport, }) if err != nil { panic(err) } return tracer, &transport } // RecorderTransport implements transport.Transport, recording the // streams sent. The streams can be retrieved using the Payloads // method. type RecorderTransport struct { mu sync.Mutex metadata *metadata payloads Payloads } // ResetPayloads clears out any recorded payloads. func (r *RecorderTransport) ResetPayloads() { r.mu.Lock() defer r.mu.Unlock() r.payloads = Payloads{} } // SendStream records the stream such that it can later be obtained via Payloads. func (r *RecorderTransport) SendStream(ctx context.Context, stream io.Reader) error { return r.record(ctx, stream) } // SendProfile records the stream such that it can later be obtained via Payloads. func (r *RecorderTransport) SendProfile(ctx context.Context, metadata io.Reader, profiles ...io.Reader) error { return r.recordProto(ctx, metadata, profiles) } // Metadata returns the metadata recorded by the transport. If metadata is yet to // be received, this method will panic. // // TODO(axw) introduce an exported type which contains all metadata, and return // that. Although we don't guarantee stability for this package this has a high // probability of breaking existing external tests, so let's do that in v2. func (r *RecorderTransport) Metadata() (_ model.System, _ model.Process, _ model.Service, labels model.IfaceMap) { r.mu.Lock() defer r.mu.Unlock() return r.metadata.System, r.metadata.Process, r.metadata.Service, r.metadata.Labels } // CloudMetadata returns the cloud metadata recorded by the transport. If metadata // is yet to be received, this method will panic. // // TODO(axw) remove when Metadata returns an exported type containing all metadata. func (r *RecorderTransport) CloudMetadata() model.Cloud { r.mu.Lock() defer r.mu.Unlock() return r.metadata.Cloud } // Payloads returns the payloads recorded by SendStream. func (r *RecorderTransport) Payloads() Payloads { r.mu.Lock() defer r.mu.Unlock() return r.payloads } func (r *RecorderTransport) record(ctx context.Context, stream io.Reader) error { reader, err := zlib.NewReader(stream) if err != nil { if err == io.ErrUnexpectedEOF { if contextDone(ctx) { return ctx.Err() } // truly unexpected } panic(err) } decoder := json.NewDecoder(reader) // The first object of any request must be a metadata struct. var metadataPayload struct { Metadata metadata `json:"metadata"` } if err := decoder.Decode(&metadataPayload); err != nil { panic(err) } r.recordMetadata(&metadataPayload.Metadata) for { var payload struct { Error *model.Error `json:"error"` Metrics *model.Metrics `json:"metricset"` Span *model.Span `json:"span"` Transaction *model.Transaction `json:"transaction"` } err := decoder.Decode(&payload) if err == io.EOF || (err == io.ErrUnexpectedEOF && contextDone(ctx)) { break } else if err != nil { panic(err) } r.mu.Lock() switch { case payload.Error != nil: r.payloads.Errors = append(r.payloads.Errors, *payload.Error) case payload.Metrics != nil: r.payloads.Metrics = append(r.payloads.Metrics, *payload.Metrics) case payload.Span != nil: r.payloads.Spans = append(r.payloads.Spans, *payload.Span) case payload.Transaction != nil: r.payloads.Transactions = append(r.payloads.Transactions, *payload.Transaction) } r.mu.Unlock() } return nil } func (r *RecorderTransport) recordProto(ctx context.Context, metadataReader io.Reader, profileReaders []io.Reader) error { var metadata metadata if err := json.NewDecoder(metadataReader).Decode(&metadata); err != nil { panic(err) } r.recordMetadata(&metadata) r.mu.Lock() defer r.mu.Unlock() for _, profileReader := range profileReaders { data, err := ioutil.ReadAll(profileReader) if err != nil { panic(err) } r.payloads.Profiles = append(r.payloads.Profiles, data) } return nil } func (r *RecorderTransport) recordMetadata(m *metadata) { r.mu.Lock() defer r.mu.Unlock() if r.metadata == nil { r.metadata = m } else { // Make sure the metadata doesn't change between requests. if diff := cmp.Diff(r.metadata, m); diff != "" { panic(fmt.Errorf("metadata changed\n%s", diff)) } } } func contextDone(ctx context.Context) bool { select { case <-ctx.Done(): return true default: return false } } // Payloads holds the recorded payloads. type Payloads struct { Errors []model.Error Metrics []model.Metrics Spans []model.Span Transactions []model.Transaction Profiles [][]byte } // Len returns the number of recorded payloads. func (p *Payloads) Len() int { return len(p.Transactions) + len(p.Errors) + len(p.Metrics) } type metadata struct { System model.System `json:"system"` Process model.Process `json:"process"` Service model.Service `json:"service"` Cloud model.Cloud `json:"cloud"` Labels model.IfaceMap `json:"labels,omitempty"` }