func()

in loader/stream_loader.go [183:243]


func (s *StreamLoad) readData(isEOS *atomic.Bool, rawWriter *io.PipeWriter, readOption *ReadOption) {
	defer rawWriter.Close()

	var writer io.Writer = rawWriter
	if s.Compress {
		// cWriter := gzip.NewWriter(rawWriter)
		cWriter := lz4.NewWriter(rawWriter)

		defer cWriter.Close()
		writer = cWriter
	}

	max_bytes_rows := readOption.maxBytesPerTask

	for {
		if max_bytes_rows <= 0 {
			break
		}

		// get data
		data, ok := <-s.queues[readOption.workerIndex]
		if !ok {
			isEOS.Store(true)
			break
		}

		max_bytes_rows -= len(data)

		// write all data to writer
		// s := string(data)
		// println(s)
		// println("xxxxxxxx")
		// if _, err := writer.Write([]byte(s)); err != nil {
		// 	panic(err)
		// }
		// var a string
		// TODO(Drogon): delete or valid
		var validUTF8Buffer []byte
		if s.CheckUTF8 {
			if utf8.Valid(data) {
				validUTF8Buffer = data
			} else {
				validUTF8Buffer = s.toValidUTF8(data)
				defer s.pool.Put(validUTF8Buffer[:0])
				log.Error("The byte slice is not valid UTF-8 encoding")
			}
		} else {
			validUTF8Buffer = data
		}
		// b := []byte(string([]rune(string(data))))
		// b := []byte(string(data))
		// strings.ToValidUTF8("a\xc5z", "")
		if _, err := writer.Write(validUTF8Buffer); err != nil {
			s.handleSendError(readOption.workerIndex, readOption.taskIndex)
			log.Errorf("send error: %v", err)
			return
		}

		s.pool.Put(data[:0])
	}
}