in main.go [383:414]
func main() {
initFlags()
retryCount := 0
for {
// create queue by worker size
var queues []chan []byte
// create file reader
fileSize := int64(0)
reader := file.NewFileReader(sourceFilePaths, batchRows, batchBytes, fileBufferSize, &queues, &bufferPool, &fileSize)
calculateAndCheckWorkers(reader, fileSize)
createQueues(&queues)
reporter := report.NewReporter(reportDuration, fileSize, uint64(workers))
streamLoad := createStreamLoad(reporter, queues)
if retryCount == 0 {
startTime = time.Now()
}
maxRowsPerTask = math.MinInt32
streamLoad.Load(workers, maxRowsPerTask, maxBytesPerTask, &retryInfo)
reporter.Report()
defer reporter.CloseWait()
reader.Read(reporter, workers, maxBytesPerTask, &retryInfo, loadResp, retryCount, lineDelimiter)
reader.Close()
streamLoad.Wait(loadInfo, retryCount, &retryInfo, startTime)
if !loadInfo.NeedRetry || retryCount >= loadInfo.RetryTimes-1 {
break
}
time.Sleep(time.Duration(loadInfo.RetryInterval) * time.Second)
retryCount++
}
}