func main()

in main.go [383:414]


func main() {
	initFlags()
	retryCount := 0
	for {
		// create queue by worker size
		var queues []chan []byte
		// create file reader
		fileSize := int64(0)
		reader := file.NewFileReader(sourceFilePaths, batchRows, batchBytes, fileBufferSize, &queues, &bufferPool, &fileSize)
		calculateAndCheckWorkers(reader, fileSize)
		createQueues(&queues)
		reporter := report.NewReporter(reportDuration, fileSize, uint64(workers))
		streamLoad := createStreamLoad(reporter, queues)
		if retryCount == 0 {
			startTime = time.Now()
		}

		maxRowsPerTask = math.MinInt32
		streamLoad.Load(workers, maxRowsPerTask, maxBytesPerTask, &retryInfo)
		reporter.Report()
		defer reporter.CloseWait()
		reader.Read(reporter, workers, maxBytesPerTask, &retryInfo, loadResp, retryCount, lineDelimiter)
		reader.Close()

		streamLoad.Wait(loadInfo, retryCount, &retryInfo, startTime)
		if !loadInfo.NeedRetry || retryCount >= loadInfo.RetryTimes-1 {
			break
		}
		time.Sleep(time.Duration(loadInfo.RetryInterval) * time.Second)
		retryCount++
	}
}