func()

in internal/mode/chunk/streamer/streamer.go [67:119]


func (s *Streamer) Stream(records []Record) error {
	if len(records) == 0 {
		return nil
	}

	// Get the type of the first record to check consistency
	recordType := records[0].Type()

	slog.Debug("streaming records", "recordType", recordType, "count", len(records), "headerSent", s.headerSent)

	s.mu.Lock()
	defer s.mu.Unlock()

	// Check if we need to write a section separator
	isSectionChange := s.headerSent == "" || s.headerSent != recordType

	if isSectionChange {
		if err := s.csvWriter.Write([]string{s.sectionSeparator}); err != nil {
			return fmt.Errorf("failed to write section separator: %w", err)
		}

		header := records[0].Header()

		if err := s.csvWriter.Write(header); err != nil {
			return fmt.Errorf("failed to write header: %w", err)
		}

		s.headerSent = recordType
	}

	// Process each record
	for i, record := range records {
		// Ensure all records are of the same type
		if record.Type() != recordType {
			return fmt.Errorf("record at index %d has type %s, expected %s - all records must be of the same type",
				i, record.Type(), recordType)
		}

		// Create CSV record with type and values
		csvRecord := record.Values()

		// Write the record
		if err := s.csvWriter.Write(csvRecord); err != nil {
			return fmt.Errorf("failed to write record at index %d: %w", i, err)
		}
	}

	// Flush the writer to ensure data is written immediately
	// Note: We're calling the internal flush method directly since we already hold the lock
	s.csvWriter.Flush()

	return nil
}