internal/mode/chunk/streamer/streamer.go (84 lines of code) (raw):
package streamer
import (
"encoding/csv"
"fmt"
"io"
"log/slog"
"os"
"sync"
)
// RecordType identifies the type of record being streamed
type RecordType string
const (
DefaultSectionSeparator = "--section-start--"
// Define various record types that can be streamed
IndexerVersionInfoType RecordType = "indexer_version_info"
IndexedChunkInfoType RecordType = "indexed_chunk_info"
)
// Record is the common interface all streamable records must implement
type Record interface {
// Type returns the record type
Type() RecordType
// Header returns the CSV header fields (excluding the 'type' field)
Header() []string
// Values returns the CSV values (excluding the type field)
Values() []string
}
// Streamer handles structured streaming of records to a writer
type Streamer struct {
sectionSeparator string
writer io.Writer
csvWriter *csv.Writer
headerSent RecordType
mu sync.Mutex // Protects concurrent access to the streamer
}
// New creates a new structured streamer that writes to the provided writer
func New(writer io.Writer, sectionSeparator string) *Streamer {
if sectionSeparator == "" {
sectionSeparator = DefaultSectionSeparator
}
return &Streamer{
sectionSeparator: sectionSeparator,
writer: writer,
csvWriter: csv.NewWriter(writer),
}
}
func NewStdout() *Streamer {
return New(os.Stdout, "")
}
// StreamSingle is a simplified method for streaming a single record
// Useful for non-streaming use cases with minimal fields
// Example: "indexer_version_info,version,build_time"
func (s *Streamer) StreamSingle(record Record) error {
return s.Stream([]Record{record})
}
func (s *Streamer) Stream(records []Record) error {
if len(records) == 0 {
return nil
}
// Get the type of the first record to check consistency
recordType := records[0].Type()
slog.Debug("streaming records", "recordType", recordType, "count", len(records), "headerSent", s.headerSent)
s.mu.Lock()
defer s.mu.Unlock()
// Check if we need to write a section separator
isSectionChange := s.headerSent == "" || s.headerSent != recordType
if isSectionChange {
if err := s.csvWriter.Write([]string{s.sectionSeparator}); err != nil {
return fmt.Errorf("failed to write section separator: %w", err)
}
header := records[0].Header()
if err := s.csvWriter.Write(header); err != nil {
return fmt.Errorf("failed to write header: %w", err)
}
s.headerSent = recordType
}
// Process each record
for i, record := range records {
// Ensure all records are of the same type
if record.Type() != recordType {
return fmt.Errorf("record at index %d has type %s, expected %s - all records must be of the same type",
i, record.Type(), recordType)
}
// Create CSV record with type and values
csvRecord := record.Values()
// Write the record
if err := s.csvWriter.Write(csvRecord); err != nil {
return fmt.Errorf("failed to write record at index %d: %w", i, err)
}
}
// Flush the writer to ensure data is written immediately
// Note: We're calling the internal flush method directly since we already hold the lock
s.csvWriter.Flush()
return nil
}
// Close flushes any buffered data and closes the underlying writer if it's a closer
func (s *Streamer) Close() error {
s.mu.Lock()
defer s.mu.Unlock()
// Flush the writer directly, not via the method since we already hold the lock
s.csvWriter.Flush()
if closer, ok := s.writer.(io.Closer); ok {
return closer.Close()
}
return nil
}