in loader/stream_loader.go [398:439]
func (s *StreamLoad) Wait(loadInfo *LoadInfo, retryCount int, retryInfo *map[int]int, startTime time.Time) {
s.wg.Wait()
if len(s.report.FailedWorkers) == 0 {
s.showLoadResult(true, startTime)
loadInfo.NeedRetry = false
return
}
// parse header
var headerStrings []string
for key, value := range loadInfo.Headers {
headerStrings = append(headerStrings, fmt.Sprintf("%s:%s", key, value))
}
headers := strings.Join(headerStrings, "?")
if retryCount < loadInfo.RetryTimes-1 {
log.Infof("\nAuto retrying, retry count %d waiting time %ds...\n", retryCount+1, loadInfo.RetryInterval)
*retryInfo = make(map[int]int)
for workerIndex, taskIndex := range s.report.FailedWorkers {
(*retryInfo)[workerIndex] = taskIndex
}
s.report.FailedWorkers = make(map[int]int)
s.loadResp.LoadFiles = []string{}
return
}
//parse failed worker
var FailedWorkersStrings []string
for key, value := range s.report.FailedWorkers {
FailedWorkersStrings = append(FailedWorkersStrings, fmt.Sprintf("%d,%d", key, value))
}
failed := strings.Join(FailedWorkersStrings, ";")
// retry command
command := fmt.Sprintf("./doris_streamloader --source_file %s --url=\"%s\" --header=\"%s\" --db=\"%s\" --table=\"%s\" -u %s -p \"%s\" --compress=%t --timeout=%d --workers=%d --disk_throughput=%d --streamload_throughput=%d --batch=%d --batch_byte=%d --max_byte_per_task=%d --check_utf8=%t --report_duration=%d --auto_retry=\"%s\" --auto_retry_times=%d --auto_retry_interval=%d",
loadInfo.SourceFilePaths, loadInfo.Url, headers, loadInfo.DbName, loadInfo.TableName, loadInfo.UserName, loadInfo.Password, loadInfo.Compress, loadInfo.Timeout, loadInfo.Workers, loadInfo.DiskThroughput, loadInfo.StreamLoadThroughput, loadInfo.BatchRows, loadInfo.BatchBytes, loadInfo.MaxBytesPerTask, loadInfo.CheckUTF8, loadInfo.ReportDuration, failed, loadInfo.RetryTimes, loadInfo.RetryInterval)
fmt.Printf("load has some error, and auto retry failed, you can retry by : \n%s\n", command)
s.showLoadResult(false, startTime)
loadInfo.NeedRetry = false
}