func()

in loader/stream_loader.go [398:439]


func (s *StreamLoad) Wait(loadInfo *LoadInfo, retryCount int, retryInfo *map[int]int, startTime time.Time) {
	s.wg.Wait()

	if len(s.report.FailedWorkers) == 0 {
		s.showLoadResult(true, startTime)
		loadInfo.NeedRetry = false
		return
	}

	// parse header
	var headerStrings []string
	for key, value := range loadInfo.Headers {
		headerStrings = append(headerStrings, fmt.Sprintf("%s:%s", key, value))
	}
	headers := strings.Join(headerStrings, "?")

	if retryCount < loadInfo.RetryTimes-1 {
		log.Infof("\nAuto retrying, retry count %d waiting time %ds...\n", retryCount+1, loadInfo.RetryInterval)
		*retryInfo = make(map[int]int)
		for workerIndex, taskIndex := range s.report.FailedWorkers {
			(*retryInfo)[workerIndex] = taskIndex
		}
		s.report.FailedWorkers = make(map[int]int)
		s.loadResp.LoadFiles = []string{}
		return
	}

	//parse failed worker
	var FailedWorkersStrings []string
	for key, value := range s.report.FailedWorkers {
		FailedWorkersStrings = append(FailedWorkersStrings, fmt.Sprintf("%d,%d", key, value))
	}
	failed := strings.Join(FailedWorkersStrings, ";")

	// retry command
	command := fmt.Sprintf("./doris_streamloader --source_file %s  --url=\"%s\" --header=\"%s\" --db=\"%s\" --table=\"%s\" -u %s -p \"%s\" --compress=%t --timeout=%d --workers=%d --disk_throughput=%d --streamload_throughput=%d --batch=%d --batch_byte=%d --max_byte_per_task=%d --check_utf8=%t --report_duration=%d --auto_retry=\"%s\" --auto_retry_times=%d --auto_retry_interval=%d",
		loadInfo.SourceFilePaths, loadInfo.Url, headers, loadInfo.DbName, loadInfo.TableName, loadInfo.UserName, loadInfo.Password, loadInfo.Compress, loadInfo.Timeout, loadInfo.Workers, loadInfo.DiskThroughput, loadInfo.StreamLoadThroughput, loadInfo.BatchRows, loadInfo.BatchBytes, loadInfo.MaxBytesPerTask, loadInfo.CheckUTF8, loadInfo.ReportDuration, failed, loadInfo.RetryTimes, loadInfo.RetryInterval)

	fmt.Printf("load has some error, and auto retry failed, you can retry by : \n%s\n", command)
	s.showLoadResult(false, startTime)
	loadInfo.NeedRetry = false
}