in loader/stream_loader.go [183:243]
func (s *StreamLoad) readData(isEOS *atomic.Bool, rawWriter *io.PipeWriter, readOption *ReadOption) {
defer rawWriter.Close()
var writer io.Writer = rawWriter
if s.Compress {
// cWriter := gzip.NewWriter(rawWriter)
cWriter := lz4.NewWriter(rawWriter)
defer cWriter.Close()
writer = cWriter
}
max_bytes_rows := readOption.maxBytesPerTask
for {
if max_bytes_rows <= 0 {
break
}
// get data
data, ok := <-s.queues[readOption.workerIndex]
if !ok {
isEOS.Store(true)
break
}
max_bytes_rows -= len(data)
// write all data to writer
// s := string(data)
// println(s)
// println("xxxxxxxx")
// if _, err := writer.Write([]byte(s)); err != nil {
// panic(err)
// }
// var a string
// TODO(Drogon): delete or valid
var validUTF8Buffer []byte
if s.CheckUTF8 {
if utf8.Valid(data) {
validUTF8Buffer = data
} else {
validUTF8Buffer = s.toValidUTF8(data)
defer s.pool.Put(validUTF8Buffer[:0])
log.Error("The byte slice is not valid UTF-8 encoding")
}
} else {
validUTF8Buffer = data
}
// b := []byte(string([]rune(string(data))))
// b := []byte(string(data))
// strings.ToValidUTF8("a\xc5z", "")
if _, err := writer.Write(validUTF8Buffer); err != nil {
s.handleSendError(readOption.workerIndex, readOption.taskIndex)
log.Errorf("send error: %v", err)
return
}
s.pool.Put(data[:0])
}
}