func()

in loader/stream_loader.go [346:378]


func (s *StreamLoad) executeGetAndSend(maxRowsPerTask int, maxBytesPerTask int, workerIndex int) {
	defer s.wg.Done()
	defer log.Info("execute worker exit")

	taskIndex := 1

	for {
		url := s.createUrl()
		var isEOS atomic.Bool
		isEOS.Store(false)
		pr, pw := io.Pipe()

		go s.readData(&isEOS, pw,
			&ReadOption{
				maxBytesPerTask: maxBytesPerTask,
				workerIndex:     workerIndex,
				taskIndex:       taskIndex,
			})
		if resp, err := s.send(url, NopCloser(pr), workerIndex, taskIndex); err != nil {
			s.handleSendError(workerIndex, taskIndex)
			log.Errorf("Send error, resp: %v error message: %v", resp, err)
			return
		} else {
			log.Debugf("send success resp: %v", resp)
		}

		if isEOS.Load() {
			break
		}
		taskIndex++
	}
	atomic.AddUint64(&s.report.TotalWorkers, 1)
}