internal/mode/chunk/streamer/streamer.go (84 lines of code) (raw):

package streamer import ( "encoding/csv" "fmt" "io" "log/slog" "os" "sync" ) // RecordType identifies the type of record being streamed type RecordType string const ( DefaultSectionSeparator = "--section-start--" // Define various record types that can be streamed IndexerVersionInfoType RecordType = "indexer_version_info" IndexedChunkInfoType RecordType = "indexed_chunk_info" ) // Record is the common interface all streamable records must implement type Record interface { // Type returns the record type Type() RecordType // Header returns the CSV header fields (excluding the 'type' field) Header() []string // Values returns the CSV values (excluding the type field) Values() []string } // Streamer handles structured streaming of records to a writer type Streamer struct { sectionSeparator string writer io.Writer csvWriter *csv.Writer headerSent RecordType mu sync.Mutex // Protects concurrent access to the streamer } // New creates a new structured streamer that writes to the provided writer func New(writer io.Writer, sectionSeparator string) *Streamer { if sectionSeparator == "" { sectionSeparator = DefaultSectionSeparator } return &Streamer{ sectionSeparator: sectionSeparator, writer: writer, csvWriter: csv.NewWriter(writer), } } func NewStdout() *Streamer { return New(os.Stdout, "") } // StreamSingle is a simplified method for streaming a single record // Useful for non-streaming use cases with minimal fields // Example: "indexer_version_info,version,build_time" func (s *Streamer) StreamSingle(record Record) error { return s.Stream([]Record{record}) } func (s *Streamer) Stream(records []Record) error { if len(records) == 0 { return nil } // Get the type of the first record to check consistency recordType := records[0].Type() slog.Debug("streaming records", "recordType", recordType, "count", len(records), "headerSent", s.headerSent) s.mu.Lock() defer s.mu.Unlock() // Check if we need to write a section separator isSectionChange := s.headerSent == "" || s.headerSent != recordType if isSectionChange { if err := s.csvWriter.Write([]string{s.sectionSeparator}); err != nil { return fmt.Errorf("failed to write section separator: %w", err) } header := records[0].Header() if err := s.csvWriter.Write(header); err != nil { return fmt.Errorf("failed to write header: %w", err) } s.headerSent = recordType } // Process each record for i, record := range records { // Ensure all records are of the same type if record.Type() != recordType { return fmt.Errorf("record at index %d has type %s, expected %s - all records must be of the same type", i, record.Type(), recordType) } // Create CSV record with type and values csvRecord := record.Values() // Write the record if err := s.csvWriter.Write(csvRecord); err != nil { return fmt.Errorf("failed to write record at index %d: %w", i, err) } } // Flush the writer to ensure data is written immediately // Note: We're calling the internal flush method directly since we already hold the lock s.csvWriter.Flush() return nil } // Close flushes any buffered data and closes the underlying writer if it's a closer func (s *Streamer) Close() error { s.mu.Lock() defer s.mu.Unlock() // Flush the writer directly, not via the method since we already hold the lock s.csvWriter.Flush() if closer, ok := s.writer.(io.Closer); ok { return closer.Close() } return nil }