pkg/api/internal/streamer/file/file.go (83 lines of code) (raw):
// package file implements the Streamer interface backed by a file buffer.
package file
import (
"bufio"
"context"
"fmt"
"io"
"os"
"sync"
)
type Streamer struct {
io.WriteCloser
cond *sync.Cond
filename string
stop bool
}
func New(filename string) (*Streamer, error) {
f, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY, 0600)
if err != nil {
return nil, fmt.Errorf("creating file %q: %w", filename, err)
}
return &Streamer{filename: filename, WriteCloser: f, cond: sync.NewCond(&sync.Mutex{})}, nil
}
func (s *Streamer) Write(p []byte) (int, error) {
s.cond.L.Lock()
defer s.cond.L.Unlock()
defer s.cond.Broadcast()
return s.WriteCloser.Write(p)
}
func (s *Streamer) Stop() {
s.cond.L.Lock()
defer s.cond.L.Unlock()
s.stop = true
s.cond.Broadcast()
}
func (s *Streamer) Close() error {
s.Stop()
return s.WriteCloser.Close()
}
// toIOReader can be used to "cast" a func([]byte)(int, error) to an io.Reader.
type toIOReader func([]byte) (int, error)
func (r toIOReader) Read(p []byte) (int, error) { return r(p) }
func (s *Streamer) Follow(ctx context.Context, offset int64, writer io.Writer) error {
f, err := os.OpenFile(s.filename, os.O_RDONLY|os.O_CREATE, 0600)
if err != nil {
return fmt.Errorf("opening log file %q: %w", s.filename, err)
}
//nolint:errcheck
defer f.Close()
if _, err := f.Seek(offset, 0); err != nil {
return fmt.Errorf("seeking log file %q: %w", s.filename, err)
}
scanner := bufio.NewScanner(toIOReader(func(p []byte) (int, error) {
s.cond.L.Lock()
defer s.cond.L.Unlock()
for {
if ctx.Err() != nil {
return 0, ctx.Err()
}
n, err := f.Read(p)
if err != io.EOF {
return n, err
}
if s.stop {
return n, io.EOF
}
s.cond.Wait()
}
}))
for scanner.Scan() {
if ctx.Err() != nil {
return ctx.Err()
}
data := scanner.Bytes()
if len(data) == 0 {
continue
}
if _, err := writer.Write(data); err != nil {
return fmt.Errorf("streaming logs: %w", err)
}
if _, err := writer.Write([]byte("\n")); err != nil {
return fmt.Errorf("streaming logs: %w", err)
}
}
// scanner.Err() returns nil instead of io.EOF even if an EOF stopped scanner.Scan().
return scanner.Err()
}