in loader/stream_loader.go [346:378]
func (s *StreamLoad) executeGetAndSend(maxRowsPerTask int, maxBytesPerTask int, workerIndex int) {
defer s.wg.Done()
defer log.Info("execute worker exit")
taskIndex := 1
for {
url := s.createUrl()
var isEOS atomic.Bool
isEOS.Store(false)
pr, pw := io.Pipe()
go s.readData(&isEOS, pw,
&ReadOption{
maxBytesPerTask: maxBytesPerTask,
workerIndex: workerIndex,
taskIndex: taskIndex,
})
if resp, err := s.send(url, NopCloser(pr), workerIndex, taskIndex); err != nil {
s.handleSendError(workerIndex, taskIndex)
log.Errorf("Send error, resp: %v error message: %v", resp, err)
return
} else {
log.Debugf("send success resp: %v", resp)
}
if isEOS.Load() {
break
}
taskIndex++
}
atomic.AddUint64(&s.report.TotalWorkers, 1)
}