func()

in pkg/api/internal/streamer/file/file.go [52:102]


func (s *Streamer) Follow(ctx context.Context, offset int64, writer io.Writer) error {
	f, err := os.OpenFile(s.filename, os.O_RDONLY|os.O_CREATE, 0600)
	if err != nil {
		return fmt.Errorf("opening log file %q: %w", s.filename, err)
	}
	//nolint:errcheck
	defer f.Close()

	if _, err := f.Seek(offset, 0); err != nil {
		return fmt.Errorf("seeking log file %q: %w", s.filename, err)
	}

	scanner := bufio.NewScanner(toIOReader(func(p []byte) (int, error) {
		s.cond.L.Lock()
		defer s.cond.L.Unlock()
		for {
			if ctx.Err() != nil {
				return 0, ctx.Err()
			}
			n, err := f.Read(p)

			if err != io.EOF {
				return n, err
			}

			if s.stop {
				return n, io.EOF
			}

			s.cond.Wait()
		}
	}))

	for scanner.Scan() {
		if ctx.Err() != nil {
			return ctx.Err()
		}
		data := scanner.Bytes()
		if len(data) == 0 {
			continue
		}
		if _, err := writer.Write(data); err != nil {
			return fmt.Errorf("streaming logs: %w", err)
		}
		if _, err := writer.Write([]byte("\n")); err != nil {
			return fmt.Errorf("streaming logs: %w", err)
		}
	}
	// scanner.Err() returns nil instead of io.EOF even if an EOF stopped scanner.Scan().
	return scanner.Err()
}