systemtest/apmservertest/logs.go (154 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package apmservertest import ( "os" "path/filepath" "runtime" "strings" "sync" "testing" "time" "go.uber.org/zap/zapcore" ) // LogEntries holds apm-server log entries. type LogEntries struct { closed chan struct{} commands chan func() mu sync.RWMutex logs []LogEntry added *sync.Cond } func (logs *LogEntries) init() { logs.closed = make(chan struct{}) logs.commands = make(chan func()) logs.added = sync.NewCond(logs.mu.RLocker()) go logs.loop() } func (logs *LogEntries) close() { closeCommand := func() { logs.mu.Lock() defer logs.mu.Unlock() close(logs.closed) logs.added.Broadcast() } select { case <-logs.closed: case logs.commands <- closeCommand: } } func (logs *LogEntries) loop() { defer logs.added.Broadcast() for { select { case <-logs.closed: return case cmd := <-logs.commands: cmd() } } } func (logs *LogEntries) add(e LogEntry) { logs.mu.Lock() defer logs.mu.Unlock() logs.logs = append(logs.logs, e) logs.added.Broadcast() } // All returns all log entries captured so far. func (logs *LogEntries) All() []LogEntry { logs.mu.RLock() defer logs.mu.RUnlock() return logs.logs } // wait waits until there are logs available from the index 'pos', // the abort channel is signalled, or logs is closed. func (logs *LogEntries) wait(pos int, abort <-chan struct{}) ([]LogEntry, bool) { logs.mu.RLock() defer logs.mu.RUnlock() for { entries := logs.logs[pos:] if len(entries) > 0 { return entries, true } select { case <-abort: return nil, false case <-logs.closed: return nil, false default: logs.added.Wait() } } } // Iterator returns a new LogEntryIterator for iterating over logs. The // iterator will remain alive until all existing logs have been consumed // and logs is closed. func (logs *LogEntries) Iterator() *LogEntryIterator { var once sync.Once done := make(chan struct{}) ch := make(chan LogEntry) iter := &LogEntryIterator{ ch: ch, stop: func() { once.Do(func() { close(done) }) }, } go func() { defer close(ch) var pos int for { entries, ok := logs.wait(pos, done) if !ok { return } for _, entry := range entries { select { case <-done: return case ch <- entry: } } pos += len(entries) } }() return iter } // LogEntryIterator provides a means of iterating over log entries. type LogEntryIterator struct { ch <-chan LogEntry stop func() } // C is a channel to which log entries will be sent. When the iterator // is closed or the apm-server process exits, the channel will be closed. func (iter *LogEntryIterator) C() <-chan LogEntry { return iter.ch } // Close closes the iterator, closing the channel returned by C(). func (iter *LogEntryIterator) Close() { iter.stop() } // LogEntry holds the details of an apm-server log entry. type LogEntry struct { Timestamp time.Time Level zapcore.Level Logger string File string Line int Message string Fields map[string]interface{} } type logpTimestamp time.Time func (t *logpTimestamp) UnmarshalText(text []byte) error { const ISO8601Layout = "2006-01-02T15:04:05.000Z0700" time, err := time.Parse(ISO8601Layout, string(text)) if err != nil { return err } *t = logpTimestamp(time) return nil } // createLogfile creates a log file in apm-server/systemtest/logs, // under a sub-directory named after the test, and with the given // filename. func createLogfile(tb testing.TB, filename string) *os.File { f, err := os.Create(filepath.Join(getTestLogsDir(tb), filename)) if err != nil { tb.Fatal(err) } return f } func getTestLogsDir(tb testing.TB) string { _, file, _, ok := runtime.Caller(0) if !ok { tb.Fatal("runtime.Caller(0) failed") } systemtestdir := filepath.Dir(filepath.Dir(file)) logsdir := filepath.Join(systemtestdir, "logs") name := strings.ReplaceAll(tb.Name(), "/", "_") logsdir = filepath.Join(logsdir, name) if err := os.MkdirAll(logsdir, 0755); err != nil { tb.Fatal(err) } return logsdir }