module/apmzap/core.go (126 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package apmzap // import "go.elastic.co/apm/module/apmzap/v2"
import (
"context"
"time"
"go.uber.org/zap/zapcore"
"go.elastic.co/apm/v2"
"go.elastic.co/apm/v2/stacktrace"
)
const (
// DefaultFatalFlushTimeout is the default value for Hook.FatalFlushTimeout.
DefaultFatalFlushTimeout = 5 * time.Second
)
func init() {
stacktrace.RegisterLibraryPackage("go.uber.org/zap")
}
// Core is an implementation of zapcore.Core, reporting log records as
// errors to the APM Server. If TraceContext is used to add trace IDs
// to the log records, the errors reported will be associated with them.
type Core struct {
// Tracer is the apm.Tracer to use for reporting errors.
// If Tracer is nil, then apm.DefaultTracer() will be used.
Tracer *apm.Tracer
// FatalFlushTimeout is the amount of time to wait while
// flushing a fatal log message to the APM Server before
// the process is exited. If this is 0, then
// DefaultFatalFlushTimeout will be used. If the timeout
// is a negative value, then no flushing will be performed.
FatalFlushTimeout time.Duration
}
func (c *Core) tracer() *apm.Tracer {
tracer := c.Tracer
if tracer == nil {
tracer = apm.DefaultTracer()
}
return tracer
}
// WrapCore returns zapcore.NewTee(core, c).
// WrapCore is suitable for passing to zap.WrapCore.
func (c *Core) WrapCore(core zapcore.Core) zapcore.Core {
return zapcore.NewTee(core, c)
}
// Sync is a no-op.
func (*Core) Sync() error {
return nil
}
// Enabled returns true if level is >= zapcore.ErrorLevel.
func (*Core) Enabled(level zapcore.Level) bool {
return level >= zapcore.ErrorLevel
}
// With returns a new zapcore.Core that decorates c with fields.
func (c *Core) With(fields []zapcore.Field) zapcore.Core {
out := &contextCore{core: c}
out.traceContext.fields(fields)
return out
}
// Check checks if the entry should be logged, and adds c to checked if so.
func (c *Core) Check(entry zapcore.Entry, checked *zapcore.CheckedEntry) *zapcore.CheckedEntry {
if entry.Level < zapcore.ErrorLevel || !c.tracer().Recording() {
return checked
}
return checked.AddCore(entry, c)
}
// Write reports entry and fields as an error using c.tracer.
func (c *Core) Write(entry zapcore.Entry, fields []zapcore.Field) error {
core := contextCore{core: c}
return core.Write(entry, fields)
}
type contextCore struct {
core *Core
traceContext traceContext
}
func (c *contextCore) Sync() error {
return nil
}
func (c *contextCore) Enabled(level zapcore.Level) bool {
return level >= zapcore.ErrorLevel
}
func (c *contextCore) With(fields []zapcore.Field) zapcore.Core {
newCore := &contextCore{
core: c.core,
traceContext: c.traceContext,
}
newCore.traceContext.fields(fields)
return newCore
}
func (c *contextCore) Check(entry zapcore.Entry, checked *zapcore.CheckedEntry) *zapcore.CheckedEntry {
if entry.Level < zapcore.ErrorLevel || !c.core.tracer().Recording() {
return checked
}
return checked.AddCore(entry, c)
}
func (c *contextCore) Write(entry zapcore.Entry, fields []zapcore.Field) error {
traceContext := c.traceContext
traceContext.fields(fields)
tracer := c.core.tracer()
errlog := tracer.NewErrorLog(apm.ErrorLogRecord{
Message: entry.Message,
Level: entry.Level.String(),
LoggerName: entry.LoggerName,
Error: traceContext.err,
})
errlog.Handled = true
errlog.Timestamp = entry.Time
errlog.SetStacktrace(1)
errlog.TraceID = traceContext.traceID
errlog.TransactionID = traceContext.transactionID
if traceContext.spanID.Validate() == nil {
errlog.ParentID = traceContext.spanID
} else {
errlog.ParentID = traceContext.transactionID
}
errlog.Send()
if entry.Level == zapcore.FatalLevel {
// Zap will exit the process following a fatal log message, so we flush the tracer.
flushTimeout := c.core.FatalFlushTimeout
if flushTimeout == 0 {
flushTimeout = DefaultFatalFlushTimeout
}
if flushTimeout >= 0 {
ctx, cancel := context.WithTimeout(context.Background(), flushTimeout)
defer cancel()
tracer.Flush(ctx.Done())
}
}
return nil
}
type traceContext struct {
err error
traceID apm.TraceID
transactionID, spanID apm.SpanID
}
func (c *traceContext) fields(fields []zapcore.Field) {
for _, field := range fields {
switch field.Key {
case "error":
c.err, _ = field.Interface.(error)
case FieldKeyTraceID:
c.traceID, _ = field.Interface.(apm.TraceID)
case FieldKeyTransactionID:
c.transactionID, _ = field.Interface.(apm.SpanID)
case FieldKeySpanID:
c.spanID, _ = field.Interface.(apm.SpanID)
}
}
}