logp/core.go (431 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package logp import ( "errors" "flag" "fmt" "io" golog "log" "os" "path/filepath" "strings" "sync/atomic" "unsafe" "go.uber.org/zap" "go.uber.org/zap/zapcore" "go.uber.org/zap/zaptest/observer" "go.elastic.co/ecszap" "github.com/elastic/elastic-agent-libs/file" "github.com/elastic/elastic-agent-libs/paths" ) var ( _log unsafe.Pointer // Pointer to a coreLogger. Access via atomic.LoadPointer. _defaultGoLog = golog.Writer() ) func init() { storeLogger(&coreLogger{ selectors: map[string]struct{}{}, rootLogger: zap.NewNop(), globalLogger: zap.NewNop(), level: zap.NewAtomicLevel(), logger: newLogger(zap.NewNop(), ""), }) } type coreLogger struct { selectors map[string]struct{} // Set of enabled debug selectors. rootLogger *zap.Logger // Root logger without any options configured. globalLogger *zap.Logger // Logger used by legacy global functions (e.g. logp.Info). logger *Logger // Logger that is the basis for all logp.Loggers. level zap.AtomicLevel // The minimum level being printed observedLogs *observer.ObservedLogs // Contains events generated while in observation mode (a testing mode). } type closerCore struct { zapcore.Core io.Closer } func (c closerCore) With(fields []zapcore.Field) zapcore.Core { return closerCore{ Core: c.Core.With(fields), Closer: c.Closer, } } // Configure configures the logp package. func Configure(cfg Config) error { return ConfigureWithOutputs(cfg) } func createSink(defaultLoggerCfg Config, outputs ...zapcore.Core) (zapcore.Core, zap.AtomicLevel, *observer.ObservedLogs, map[string]struct{}, error) { var ( sink zapcore.Core observedLogs *observer.ObservedLogs err error level zap.AtomicLevel ) level = zap.NewAtomicLevelAt(defaultLoggerCfg.Level.ZapLevel()) // Build a single output (stderr has priority if more than one are enabled). if defaultLoggerCfg.toObserver { sink, observedLogs = observer.New(level) } else { sink, err = createLogOutput(defaultLoggerCfg, level) } if err != nil { return nil, level, nil, nil, fmt.Errorf("failed to build log output: %w", err) } // Default logger is always discard, debug level below will // possibly re-enable it. golog.SetOutput(io.Discard) // Enabled selectors when debug is enabled. selectors := make(map[string]struct{}, len(defaultLoggerCfg.Selectors)) if defaultLoggerCfg.Level.Enabled(DebugLevel) && len(defaultLoggerCfg.Selectors) > 0 { for _, sel := range defaultLoggerCfg.Selectors { selectors[strings.TrimSpace(sel)] = struct{}{} } // Default to all enabled if no selectors are specified. if len(selectors) == 0 { selectors["*"] = struct{}{} } // Re-enable the default go logger output when either stdlog // or all selector is enabled. _, stdlogEnabled := selectors["stdlog"] _, allEnabled := selectors["*"] if stdlogEnabled || allEnabled { golog.SetOutput(_defaultGoLog) } sink = selectiveWrapper(sink, selectors) } sink = newMultiCore(append(outputs, sink)...) return sink, level, observedLogs, selectors, err } // ConfigureWithOutputs configures the global logger to use an output created // from `defaultLoggerCfg` and all the outputs passed by `outputs`. // This function needs to be exported because it's used by `logp/configure` func ConfigureWithOutputs(defaultLoggerCfg Config, outputs ...zapcore.Core) error { sink, level, observedLogs, selectors, err := createSink(defaultLoggerCfg, outputs...) if err != nil { return err } root := zap.New(sink, makeOptions(defaultLoggerCfg)...) storeLogger(&coreLogger{ selectors: selectors, rootLogger: root, globalLogger: root.WithOptions(zap.AddCallerSkip(1)), logger: newLogger(root, ""), level: level, observedLogs: observedLogs, }) return nil } // ConfigureWithCore configures the global logger to use the passed in // core. It is assumed that an output has already been defined with // the core and a new one should not be created. The loggerCfg is // only used to set selectors and level. This is useful if a part of // your code uses logp but the log output is already handled. Normal // use cases should use Configure or ConfigureWithOutput. // // Deprecated: Prefer using localized loggers. Use logp.ConfigureWithCoreLocal. func ConfigureWithCore(loggerCfg Config, core zapcore.Core) error { var ( sink zapcore.Core level zap.AtomicLevel ) level = zap.NewAtomicLevelAt(loggerCfg.Level.ZapLevel()) if loggerCfg.WithFields != nil { fields := make([]zapcore.Field, 0, len(loggerCfg.WithFields)) for key, value := range loggerCfg.WithFields { fields = append(fields, zap.Any(key, value)) } core = core.With(fields) } sink = wrappedCore(core) // Enabled selectors when debug is enabled. selectors := make(map[string]struct{}, len(loggerCfg.Selectors)) if loggerCfg.Level.Enabled(DebugLevel) && len(loggerCfg.Selectors) > 0 { for _, sel := range loggerCfg.Selectors { selectors[strings.TrimSpace(sel)] = struct{}{} } // Default to all enabled if no selectors are specified. if len(selectors) == 0 { selectors["*"] = struct{}{} } sink = selectiveWrapper(sink, selectors) } root := zap.New(sink, makeOptions(loggerCfg)...) storeLogger(&coreLogger{ selectors: selectors, rootLogger: root, globalLogger: root.WithOptions(zap.AddCallerSkip(1)), logger: newLogger(root, ""), level: level, observedLogs: nil, }) return nil } // ConfigureWithCoreLocal returns a logger using passed in // core. It is assumed that an output has already been defined with // the core and a new one should not be created. The loggerCfg is // only used to set selectors and level. func ConfigureWithCoreLocal(loggerCfg Config, core zapcore.Core) (*Logger, error) { var ( sink zapcore.Core level zap.AtomicLevel ) level = zap.NewAtomicLevelAt(loggerCfg.Level.ZapLevel()) if loggerCfg.WithFields != nil { fields := make([]zapcore.Field, 0, len(loggerCfg.WithFields)) for key, value := range loggerCfg.WithFields { fields = append(fields, zap.Any(key, value)) } core = core.With(fields) } sink = wrappedCore(core) // Enabled selectors when debug is enabled. selectors := make(map[string]struct{}, len(loggerCfg.Selectors)) if loggerCfg.Level.Enabled(DebugLevel) && len(loggerCfg.Selectors) > 0 { for _, sel := range loggerCfg.Selectors { selectors[strings.TrimSpace(sel)] = struct{}{} } // Default to all enabled if no selectors are specified. if len(selectors) == 0 { selectors["*"] = struct{}{} } sink = selectiveWrapper(sink, selectors) } root := zap.New(sink, makeOptions(loggerCfg)...) // TODO: Remove this when there is no more global logger dependency storeLogger(&coreLogger{ selectors: selectors, rootLogger: root, globalLogger: root.WithOptions(zap.AddCallerSkip(1)), logger: newLogger(root, ""), level: level, observedLogs: nil, }) return newLogger(root, ""), nil } // ConfigureWithTypedOutput configures the global logger to use typed outputs. // // If a log entry matches the defined key/value, this entry is logged using the // core generated from `typedLoggerCfg`, otherwise it will be logged by all // cores in `outputs` and the one generated from `defaultLoggerCfg`. // Arguments: // - `defaultLoggerCfg` is used to create a new core that will be the default // output from the logger // - `typedLoggerCfg` is used to create a new output that will only be used // when the log entry matches `entry[logKey] = kind` // - `key` is the key the typed logger will look at // - `value` is the value compared against the `logKey` entry // - `outputs` is a list of cores that will be added together with the core // generated by `defaultLoggerCfg` as the default output for the loggger. // // If `defaultLoggerCfg.toObserver` is true, then `typedLoggerCfg` is ignored // and a single sink is used so all logs can be observed. // // Deprecated: Prefer using localized loggers. Use logp.ConfigureWithTypedOutputLocal. func ConfigureWithTypedOutput(defaultLoggerCfg, typedLoggerCfg Config, key, value string, outputs ...zapcore.Core) error { sink, level, observedLogs, selectors, err := createSink(defaultLoggerCfg, outputs...) if err != nil { return err } var typedCore zapcore.Core if defaultLoggerCfg.toObserver { typedCore = sink } else { typedCore, err = createLogOutput(typedLoggerCfg, level) } if err != nil { return fmt.Errorf("could not create typed logger output: %w", err) } sink = &typedLoggerCore{ defaultCore: sink, typedCore: typedCore, key: key, value: value, } sink = selectiveWrapper(sink, selectors) root := zap.New(sink, makeOptions(defaultLoggerCfg)...) storeLogger(&coreLogger{ selectors: selectors, rootLogger: root, globalLogger: root.WithOptions(zap.AddCallerSkip(1)), logger: newLogger(root, ""), level: level, observedLogs: observedLogs, }) return nil } // ConfigureWithTypedOutputLocal returns a logger that uses typed outputs. // // If a log entry matches the defined key/value, this entry is logged using the // core generated from `typedLoggerCfg`, otherwise it will be logged by all // cores in `outputs` and the one generated from `defaultLoggerCfg`. // Arguments: // - `defaultLoggerCfg` is used to create a new core that will be the default // output from the logger // - `typedLoggerCfg` is used to create a new output that will only be used // when the log entry matches `entry[logKey] = kind` // - `key` is the key the typed logger will look at // - `value` is the value compared against the `logKey` entry // - `outputs` is a list of cores that will be added together with the core // generated by `defaultLoggerCfg` as the default output for the loggger. func ConfigureWithTypedOutputLocal(defaultLoggerCfg, typedLoggerCfg Config, key, value string, outputs ...zapcore.Core) (*Logger, error) { sink, level, observedLogs, selectors, err := createSink(defaultLoggerCfg, outputs...) if err != nil { return nil, err } var typedCore zapcore.Core typedCore, err = createLogOutput(typedLoggerCfg, level) if err != nil { return nil, fmt.Errorf("could not create typed logger output: %w", err) } sink = &typedLoggerCore{ defaultCore: sink, typedCore: typedCore, key: key, value: value, } sink = selectiveWrapper(sink, selectors) root := zap.New(sink, makeOptions(defaultLoggerCfg)...) // TODO: Remove this when there is no more global logger dependency storeLogger(&coreLogger{ selectors: selectors, rootLogger: root, globalLogger: root.WithOptions(zap.AddCallerSkip(1)), logger: newLogger(root, ""), level: level, observedLogs: observedLogs, }) logger := newLogger(root, "") return logger, nil } func createLogOutput(cfg Config, enab zapcore.LevelEnabler) (zapcore.Core, error) { switch { case cfg.toIODiscard: return makeDiscardOutput(cfg, enab) case cfg.ToStderr: return makeStderrOutput(cfg, enab) case cfg.ToSyslog: return makeSyslogOutput(cfg, enab) case cfg.ToEventLog: return makeEventLogOutput(cfg, enab) case cfg.ToFiles: return makeFileOutput(cfg, enab) } switch cfg.environment { case SystemdEnvironment, ContainerEnvironment: return makeStderrOutput(cfg, enab) case MacOSServiceEnvironment, WindowsServiceEnvironment: return makeFileOutput(cfg, enab) default: return zapcore.NewNopCore(), nil } } // DevelopmentSetup configures the logger in development mode at debug level. // By default the output goes to stderr. // // Deprecated: Prefer using localized loggers. Use logp.NewDevelopmentLogger. func DevelopmentSetup(options ...Option) error { cfg := Config{ Level: DebugLevel, ToStderr: true, development: true, AddCaller: true, } for _, apply := range options { apply(&cfg) } return Configure(cfg) } // TestingSetup configures logging by calling DevelopmentSetup if and only if // verbose testing is enabled (as in 'go test -v'). // // Deprecated: Prefer using localized loggers. Use logptest.NewTestingLogger. func TestingSetup(options ...Option) error { // Use the flag to avoid a dependency on the testing package. f := flag.Lookup("test.v") if f != nil && f.Value.String() == "true" { return DevelopmentSetup(options...) } return nil } // ObserverLogs provides the list of logs generated during the observation // process. func ObserverLogs() *observer.ObservedLogs { return loadLogger().observedLogs } // Sync flushes any buffered log entries. Applications should take care to call // Sync before exiting. func Sync() error { return loadLogger().rootLogger.Sync() } func makeOptions(cfg Config) []zap.Option { var options []zap.Option if cfg.AddCaller { options = append(options, zap.AddCaller()) } if cfg.development { options = append(options, zap.Development()) } if cfg.Beat != "" { fields := []zap.Field{ zap.String("service.name", cfg.Beat), } options = append(options, zap.Fields(fields...)) } return options } func makeStderrOutput(cfg Config, enab zapcore.LevelEnabler) (zapcore.Core, error) { stderr := zapcore.Lock(os.Stderr) return newCore(buildEncoder(cfg), stderr, enab), nil } func makeDiscardOutput(cfg Config, enab zapcore.LevelEnabler) (zapcore.Core, error) { discard := zapcore.AddSync(io.Discard) return newCore(buildEncoder(cfg), discard, enab), nil } func makeSyslogOutput(cfg Config, enab zapcore.LevelEnabler) (zapcore.Core, error) { core, err := newSyslog(buildEncoder(cfg), enab) if err != nil { return nil, err } return wrappedCore(core), nil } func makeEventLogOutput(cfg Config, enab zapcore.LevelEnabler) (zapcore.Core, error) { core, err := newEventLog(cfg.Beat, buildEncoder(cfg), enab) // nolint: staticcheck,nolintlint // the implementation is OS-specific and some implementations always return errors if err != nil { return nil, err } return wrappedCore(core), nil } func makeFileOutput(cfg Config, enab zapcore.LevelEnabler) (zapcore.Core, error) { filename := paths.Resolve(paths.Logs, filepath.Join(cfg.Files.Path, cfg.LogFilename())) rotator, err := file.NewFileRotator(filename, file.MaxSizeBytes(cfg.Files.MaxSize), file.MaxBackups(cfg.Files.MaxBackups), file.Permissions(os.FileMode(cfg.Files.Permissions)), file.Interval(cfg.Files.Interval), file.RotateOnStartup(cfg.Files.RotateOnStartup), file.RedirectStderr(cfg.Files.RedirectStderr), ) if err != nil { return nil, fmt.Errorf("failed to create file rotator: %w", err) } // Keep the same behaviour from before we introduced the closerCore. core, err := newCore(buildEncoder(cfg), rotator, enab), nil if err != nil { return core, err } cc := closerCore{ Core: core, Closer: rotator, } return &cc, err } func newCore(enc zapcore.Encoder, ws zapcore.WriteSyncer, enab zapcore.LevelEnabler) zapcore.Core { return wrappedCore(zapcore.NewCore(enc, ws, enab)) } func wrappedCore(core zapcore.Core) zapcore.Core { wc := ecszap.WrapCore(core) if closeCore, ok := core.(io.Closer); ok { cc := closerCore{ Core: wc, Closer: closeCore, } return &cc } return wc } func globalLogger() *zap.Logger { return loadLogger().globalLogger } func loadLogger() *coreLogger { p := atomic.LoadPointer(&_log) return (*coreLogger)(p) } func storeLogger(l *coreLogger) { if old := loadLogger(); old != nil { _ = old.rootLogger.Sync() } atomic.StorePointer(&_log, unsafe.Pointer(l)) } func SetLevel(lvl zapcore.Level) { loadLogger().level.SetLevel(lvl) } func GetLevel() zapcore.Level { return loadLogger().level.Level() } // newMultiCore creates a sink that sends to multiple cores. func newMultiCore(cores ...zapcore.Core) zapcore.Core { return &multiCore{cores} } // multiCore allows multiple cores to be used for logging. type multiCore struct { cores []zapcore.Core } // Enabled returns true if the level is enabled in any one of the cores. func (m multiCore) Enabled(level zapcore.Level) bool { for _, core := range m.cores { if core.Enabled(level) { return true } } return false } // With creates a new multiCore with each core set with the given fields. func (m multiCore) With(fields []zapcore.Field) zapcore.Core { cores := make([]zapcore.Core, len(m.cores)) for i, core := range m.cores { cores[i] = core.With(fields) } return &multiCore{cores} } // Check will place each core that checks for that entry. func (m multiCore) Check(entry zapcore.Entry, checked *zapcore.CheckedEntry) *zapcore.CheckedEntry { for _, core := range m.cores { checked = core.Check(entry, checked) } return checked } // Write writes the entry to each core. func (m multiCore) Write(entry zapcore.Entry, fields []zapcore.Field) error { var errs []error for _, core := range m.cores { if err := core.Write(entry, fields); err != nil { errs = append(errs, err) } } return errors.Join(errs...) } // Sync syncs each core. func (m multiCore) Sync() error { var errs []error for _, core := range m.cores { if err := core.Sync(); err != nil { errs = append(errs, err) } } return errors.Join(errs...) } // Close calls Close on any core that implements io.Closer. // All returned errors are joined by errors.Join and returned. func (m multiCore) Close() error { errs := []error{} for _, core := range m.cores { if closer, ok := core.(io.Closer); ok { closeErr := closer.Close() errs = append(errs, closeErr) } } return errors.Join(errs...) }