logger.go (240 lines of code) (raw):
// Copyright 2018 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package daisy
import (
"bufio"
"bytes"
"context"
"fmt"
"path"
"regexp"
"sync"
"time"
"cloud.google.com/go/logging"
"cloud.google.com/go/storage"
)
// Logger is a helper that encapsulates the logging logic for Daisy.
type Logger interface {
WriteLogEntry(e *LogEntry)
// AppendSerialPortLogs appends a portion of serial port logs for a GCE instance.
AppendSerialPortLogs(w *Workflow, instance string, logs string)
// WriteSerialPortLogsToCloudLogging writes all of the collected logs for instance to cloud logging.
WriteSerialPortLogsToCloudLogging(w *Workflow, instance string)
// ReadSerialPortLogs returns all collected serial port logs, with one entry per instance.
ReadSerialPortLogs() []string
Flush()
}
type cloudLogWriter interface {
Log(e logging.Entry)
Flush() error
}
// daisyLog wraps the different logging mechanisms that can be used.
type daisyLog struct {
gcsLogWriter *syncedWriter
cloudLogger cloudLogWriter
stdoutLogging bool
logCleanupRegex *regexp.Regexp
// A map of instance name to its serial logs.
serialLogs map[string]*bytes.Buffer
}
// createLogger builds a Logger.
func (w *Workflow) createLogger(ctx context.Context) {
l := newDaisyLogger(!w.stdoutLoggingDisabled)
if !w.gcsLoggingDisabled {
gcsLogger := NewGCSLogger(ctx, w.StorageClient, w.bucket, path.Join(w.logsPath, "daisy.log"))
l.gcsLogWriter = &syncedWriter{buf: bufio.NewWriter(gcsLogger)}
periodicFlush(func() { l.gcsLogWriter.Flush() })
}
if !w.cloudLoggingDisabled && w.CloudLoggingClient != nil {
// Verify we can communicate with the log service.
if err := w.CloudLoggingClient.Ping(ctx); err != nil {
l.WriteLogEntry(&LogEntry{
LocalTimestamp: time.Now(),
WorkflowName: getAbsoluteName(w),
Message: fmt.Sprintf("Unable to send logs to the Cloud Logging service, not sending logs: %v", err),
})
w.CloudLoggingClient = nil
} else {
cloudLogName := fmt.Sprintf("daisy-%s-%s", w.Name, w.id)
l.cloudLogger = w.CloudLoggingClient.Logger(cloudLogName)
periodicFlush(func() { l.cloudLogger.Flush() })
}
}
w.Logger = l
w.addCleanupHook(func() DError {
w.Logger.Flush()
return nil
})
}
func newDaisyLogger(stdOutLoggingEnabled bool) *daisyLog {
return &daisyLog{
stdoutLogging: stdOutLoggingEnabled,
serialLogs: map[string]*bytes.Buffer{},
}
}
// LogStepInfo logs information for the workflow step.
func (w *Workflow) LogStepInfo(stepName, stepType, format string, a ...interface{}) {
entry := &LogEntry{
LocalTimestamp: time.Now(),
WorkflowName: getAbsoluteName(w),
StepName: stepName,
StepType: stepType,
Message: fmt.Sprintf(format, a...),
Type: "Daisy",
}
w.logEntry(entry)
}
// LogWorkflowInfo logs information for the workflow.
func (w *Workflow) LogWorkflowInfo(format string, a ...interface{}) {
entry := &LogEntry{
LocalTimestamp: time.Now(),
WorkflowName: getAbsoluteName(w),
Message: fmt.Sprintf(format, a...),
}
w.logEntry(entry)
}
func (w *Workflow) logEntry(e *LogEntry) {
// Execute all log process hooks
rw := w
for rw != nil {
if rw.logProcessHook != nil {
e.Message = rw.logProcessHook(e.Message)
}
rw = rw.parent
}
w.Logger.WriteLogEntry(e)
}
// AppendSerialPortLogs collects a segment of serial port logs for an instance.
func (l *daisyLog) AppendSerialPortLogs(w *Workflow, instance string, logs string) {
// Only collect serial port logs if the user has opted-in to cloud logging.
if l.cloudLogger == nil {
return
}
if _, hasBuffer := l.serialLogs[instance]; !hasBuffer {
l.serialLogs[instance] = &bytes.Buffer{}
}
l.serialLogs[instance].WriteString(logs)
}
// WriteSerialPortLogsToCloudLogging writes the serial port logs for an instance to cloud logging.
func (l *daisyLog) WriteSerialPortLogsToCloudLogging(w *Workflow, instance string) {
if l.cloudLogger == nil {
return
}
if _, hasBuffer := l.serialLogs[instance]; !hasBuffer {
return
}
logs := l.serialLogs[instance].Bytes()
writeLog := func(data []byte) {
entry := &LogEntry{
LocalTimestamp: time.Now(),
WorkflowName: getAbsoluteName(w),
Message: fmt.Sprintf("Serial port output for instance %q", instance),
SerialPort1: string(data),
Type: "Daisy",
}
l.cloudLogger.Log(logging.Entry{Timestamp: entry.LocalTimestamp, Payload: entry})
}
// Write the output to cloud logging only after instance has stopped.
// Type assertion check is needed for tests not to panic.
// Split if output is too long for log entry (100K max, we leave a 2K buffer).
if len(logs) <= 98*1024 {
writeLog(logs)
return
}
bs := bytes.SplitAfter(logs, []byte("\n"))
var data []byte
for _, b := range bs {
if len(data)+len(b) > 98*1024 {
writeLog(data)
data = b
} else {
data = append(data, b...)
}
}
writeLog(data)
}
func (l *daisyLog) ReadSerialPortLogs() []string {
allLogs := make([]string, 0, len(l.serialLogs))
for instance, log := range l.serialLogs {
allLogs = append(allLogs, fmt.Sprintf("Serial logs for instance: %s\n%s", instance, log.Bytes()))
}
return allLogs
}
// Flush flushes all loggers.
func (l *daisyLog) Flush() {
if l.gcsLogWriter != nil {
l.gcsLogWriter.Flush()
}
if l.cloudLogger != nil {
l.cloudLogger.Flush()
}
}
// LogEntry encapsulates a single log entry.
type LogEntry struct {
LocalTimestamp time.Time `json:"localTimestamp"`
WorkflowName string `json:"workflow"`
StepName string `json:"stepName,omitempty"`
StepType string `json:"stepType,omitempty"`
SerialPort1 string `json:"serialPort1,omitempty"`
Message string `json:"message"`
Type string `json:"type"`
}
func (l *daisyLog) WriteLogEntry(e *LogEntry) {
if l.cloudLogger != nil {
l.cloudLogger.Log(logging.Entry{Timestamp: e.LocalTimestamp, Payload: e})
}
if l.gcsLogWriter != nil {
l.gcsLogWriter.Write([]byte(e.String()))
}
if l.stdoutLogging {
fmt.Print(e)
}
}
type syncedWriter struct {
buf *bufio.Writer
mx sync.Mutex
}
func (l *syncedWriter) Write(b []byte) (int, error) {
l.mx.Lock()
defer l.mx.Unlock()
return l.buf.Write(b)
}
func (l *syncedWriter) Flush() error {
l.mx.Lock()
defer l.mx.Unlock()
return l.buf.Flush()
}
// GCSLogger is a logger that writes to a GCS object.
type GCSLogger struct {
client *storage.Client
bucket, object string
buf *bytes.Buffer
ctx context.Context
}
// NewGCSLogger creates a new GCSLogger.
func NewGCSLogger(ctx context.Context, client *storage.Client, bucket, object string) *GCSLogger {
return &GCSLogger{client: client, bucket: bucket, object: object, ctx: ctx}
}
func (l *GCSLogger) Write(b []byte) (int, error) {
if l.buf == nil {
l.buf = new(bytes.Buffer)
}
l.buf.Write(b)
wc := l.client.Bucket(l.bucket).Object(l.object).NewWriter(l.ctx)
wc.ContentType = "text/plain"
if _, err := wc.Write(l.buf.Bytes()); err != nil {
return 0, err
}
if err := wc.Close(); err != nil {
return 0, err
}
return len(b), nil
}
func periodicFlush(f func()) {
go func() {
for {
time.Sleep(5 * time.Second)
f()
}
}()
}
func getAbsoluteName(w *Workflow) string {
name := w.Name
for parent := w.parent; parent != nil; parent = parent.parent {
name = parent.Name + "." + name
}
return name
}
func (e *LogEntry) String() string {
var prefix string
if e.StepName != "" {
prefix = fmt.Sprintf("%s.%s", e.WorkflowName, e.StepName)
} else {
prefix = e.WorkflowName
}
var msg string
if e.StepType != "" {
msg = fmt.Sprintf("%s: %s", e.StepType, e.Message)
} else {
msg = e.Message
}
timestamp := e.LocalTimestamp.Format(time.RFC3339)
return fmt.Sprintf("[%s]: %s %s\n", prefix, timestamp, msg)
}