module/apmzerolog/writer.go (171 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package apmzerolog // import "go.elastic.co/apm/module/apmzerolog/v2" import ( "bytes" "context" "encoding/hex" "encoding/json" "io" "strconv" "time" "github.com/pkg/errors" "github.com/rs/zerolog" "github.com/rs/zerolog/pkgerrors" "go.elastic.co/apm/v2" "go.elastic.co/apm/v2/stacktrace" ) const ( // DefaultFatalFlushTimeout is the default value for Writer.FatalFlushTimeout. DefaultFatalFlushTimeout = 5 * time.Second // StackSourceLineName is the key for the line number of a stack frame. StackSourceLineName = "line" // StackSourceFunctionName is the key for the function name of a stack frame. StackSourceFunctionName = "func" ) func init() { stacktrace.RegisterLibraryPackage("github.com/rs/zerolog") } // Writer is an implementation of zerolog.LevelWriter, reporting log records as // errors to the APM Server. If TraceContext is used to add trace IDs to the log // records, the errors reported will be associated with them. // // Because we only have access to the serialised form of the log record, we must // rely on enough information being encoded into the events. For error stack traces, // you must use zerolog's Stack() method, and set zerolog.ErrorStackMarshaler // either to github.com/rs/zerolog/pkgerrors.MarshalStack, or to the function // apmzerolog.MarshalErrorStack in this package. The pkgerrors.MarshalStack // implementation omits some information, whereas apmzerolog is designed to // convey the complete file location and fully qualified function name. type Writer struct { // Tracer is the apm.Tracer to use for reporting errors. // If Tracer is nil, then apm.DefaultTracer() will be used. Tracer *apm.Tracer // FatalFlushTimeout is the amount of time to wait while // flushing a fatal log message to the APM Server before // the process is exited. If this is 0, then // DefaultFatalFlushTimeout will be used. If the timeout // is a negative value, then no flushing will be performed. FatalFlushTimeout time.Duration // MinLevel holds the minimum level of logs to send to // Elastic APM as errors. // // MinLevel must be greater than or equal to zerolog.ErrorLevel. // If it is less than this, zerolog.ErrorLevel will be used as // the minimum instead. MinLevel zerolog.Level } func (w *Writer) tracer() *apm.Tracer { tracer := w.Tracer if tracer == nil { tracer = apm.DefaultTracer() } return tracer } func (w *Writer) minLevel() zerolog.Level { minLevel := w.MinLevel if minLevel < zerolog.ErrorLevel { minLevel = zerolog.ErrorLevel } return minLevel } // Write is a no-op. func (*Writer) Write(p []byte) (int, error) { return len(p), nil } // WriteLevel decodes the JSON-encoded log record in p, and reports it as an error using w.Tracer. func (w *Writer) WriteLevel(level zerolog.Level, p []byte) (int, error) { if level < w.minLevel() || level >= zerolog.NoLevel { return len(p), nil } tracer := w.tracer() if !tracer.Recording() { return len(p), nil } var logRecord logRecord if err := logRecord.decode(bytes.NewReader(p)); err != nil { return 0, err } errlog := tracer.NewErrorLog(apm.ErrorLogRecord{ Level: level.String(), Message: logRecord.message, Error: logRecord.err, }) if !logRecord.timestamp.IsZero() { errlog.Timestamp = logRecord.timestamp } errlog.Handled = true errlog.SetStacktrace(1) errlog.TraceID = logRecord.traceID errlog.TransactionID = logRecord.transactionID if logRecord.spanID.Validate() == nil { errlog.ParentID = logRecord.spanID } else { errlog.ParentID = logRecord.transactionID } errlog.Send() if level == zerolog.FatalLevel { // Zap will exit the process following a fatal log message, so we flush the tracer. flushTimeout := w.FatalFlushTimeout if flushTimeout == 0 { flushTimeout = DefaultFatalFlushTimeout } if flushTimeout >= 0 { ctx, cancel := context.WithTimeout(context.Background(), flushTimeout) defer cancel() tracer.Flush(ctx.Done()) } } return len(p), nil } type logRecord struct { message string timestamp time.Time err error traceID apm.TraceID transactionID, spanID apm.SpanID } func (l *logRecord) decode(r io.Reader) (result error) { m := make(map[string]interface{}) d := json.NewDecoder(r) d.UseNumber() if err := d.Decode(&m); err != nil { return err } l.message, _ = m[zerolog.MessageFieldName].(string) if strval, ok := m[zerolog.TimestampFieldName].(string); ok { if t, err := time.Parse(zerolog.TimeFieldFormat, strval); err == nil { l.timestamp = t.UTC() } } if errmsg, ok := m[zerolog.ErrorFieldName].(string); ok { err := &jsonError{message: errmsg} if stack, ok := m[zerolog.ErrorStackFieldName].([]interface{}); ok { frames := make([]stacktrace.Frame, 0, len(stack)) for i := range stack { in, ok := stack[i].(map[string]interface{}) if !ok { continue } var frame stacktrace.Frame frame.File, _ = in[pkgerrors.StackSourceFileName].(string) frame.Function, _ = in[StackSourceFunctionName].(string) if strval, ok := in[StackSourceLineName].(string); ok { if line, err := strconv.Atoi(strval); err == nil { frame.Line = line } } frames = append(frames, frame) } err.stack = frames } l.err = err } if strval, ok := m[SpanIDFieldName].(string); ok { if err := decodeHex(l.spanID[:], strval); err != nil { return errors.Wrap(err, "invalid span.id") } } if strval, ok := m[TraceIDFieldName].(string); ok { if err := decodeHex(l.traceID[:], strval); err != nil { return errors.Wrap(err, "invalid trace.id") } } if strval, ok := m[TransactionIDFieldName].(string); ok { if err := decodeHex(l.transactionID[:], strval); err != nil { return errors.Wrap(err, "invalid transaction.id") } } return nil } func decodeHex(out []byte, in string) error { if n := hex.EncodedLen(len(out)); n != len(in) { return errors.Errorf( "invalid value length (expected %d bytes, got %d)", n, len(in), ) } _, err := hex.Decode(out, []byte(in)) return err } type jsonError struct { message string stack []stacktrace.Frame } func (e *jsonError) Type() string { return "error" } func (e *jsonError) Error() string { return e.message } func (e *jsonError) StackTrace() []stacktrace.Frame { return e.stack }