internal/util/env_log.go (81 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. // package util import ( "bufio" "context" "io" "os" "path/filepath" "sync" ) type ResourceLogFollower struct { Ctx context.Context cancelFunc context.CancelFunc basePath string followLock *sync.RWMutex following map[string]bool } func NewResourceLogFollower(ctx context.Context, basePath string) *ResourceLogFollower { childCtx, cancelFunc := context.WithCancel(ctx) return &ResourceLogFollower{ Ctx: childCtx, cancelFunc: cancelFunc, basePath: basePath, followLock: &sync.RWMutex{}, following: make(map[string]bool), } } func (l *ResourceLogFollower) BuildLogWriter(path string) (*os.File, error) { logFile := l.buildLogFilename(path) if err := os.MkdirAll(filepath.Dir(logFile), os.ModePerm); err != nil { return nil, err } if _, err := os.Stat(logFile); os.IsExist(err) { if err := os.Remove(logFile); err != nil { return nil, err } } return os.Create(logFile) } func (l *ResourceLogFollower) ConsumeLog(logWriter *os.File, stream io.ReadCloser) <-chan struct{} { if l.IsFollowed(logWriter.Name()) { return nil } finished := make(chan struct{}, 1) go func() { defer func() { stream.Close() close(finished) }() r := bufio.NewReader(stream) for { bytes, err := r.ReadBytes('\n') if err != nil { if err != io.EOF { return } return } l.writeFollowed(logWriter) if _, err := logWriter.Write(bytes); err != nil { return } } }() return finished } func (l *ResourceLogFollower) IsFollowed(path string) bool { l.followLock.RLock() defer l.followLock.RUnlock() return l.following[l.buildLogFilename(path)] } func (l *ResourceLogFollower) Close() { l.cancelFunc() } func (l *ResourceLogFollower) buildLogFilename(path string) string { return filepath.Join(l.basePath, path) } func (l *ResourceLogFollower) writeFollowed(writer *os.File) { l.followLock.Lock() defer l.followLock.Unlock() l.following[writer.Name()] = true }