in internal/mode/chunk/streamer/streamer.go [67:119]
func (s *Streamer) Stream(records []Record) error {
if len(records) == 0 {
return nil
}
// Get the type of the first record to check consistency
recordType := records[0].Type()
slog.Debug("streaming records", "recordType", recordType, "count", len(records), "headerSent", s.headerSent)
s.mu.Lock()
defer s.mu.Unlock()
// Check if we need to write a section separator
isSectionChange := s.headerSent == "" || s.headerSent != recordType
if isSectionChange {
if err := s.csvWriter.Write([]string{s.sectionSeparator}); err != nil {
return fmt.Errorf("failed to write section separator: %w", err)
}
header := records[0].Header()
if err := s.csvWriter.Write(header); err != nil {
return fmt.Errorf("failed to write header: %w", err)
}
s.headerSent = recordType
}
// Process each record
for i, record := range records {
// Ensure all records are of the same type
if record.Type() != recordType {
return fmt.Errorf("record at index %d has type %s, expected %s - all records must be of the same type",
i, record.Type(), recordType)
}
// Create CSV record with type and values
csvRecord := record.Values()
// Write the record
if err := s.csvWriter.Write(csvRecord); err != nil {
return fmt.Errorf("failed to write record at index %d: %w", i, err)
}
}
// Flush the writer to ensure data is written immediately
// Note: We're calling the internal flush method directly since we already hold the lock
s.csvWriter.Flush()
return nil
}