func()

in internal/master/second_job_update_instance_status_handler.go [288:362]


func (h *secondJobUpdateInstanceStatusHandler) setHistory(serialNum int64, loopStartTime int64, status processor.InstanceStatus) {
	if status == processor.InstanceStatusSucceed {
		h.secondProgressDetail.GetTodayProgressCounter().IncrementOneSuccess()
	} else {
		h.secondProgressDetail.GetTodayProgressCounter().IncrementOneFailed()
	}

	if !h.taskMaster.IsKilled() {
		h.secondProgressDetail.GetTodayProgressCounter().IncrementRunning()
		h.secondProgressDetail.GetTodayProgressCounter().IncrementOneTotal()
	}

	// reset today progress counter
	todayBeginTime, err := time.Parse(constants.TimeFormat, h.secondProgressDetail.GetTodayBeginTime())
	if err != nil {
		logger.Errorf("setHistory failed, getTodayBeginTime from secondProgressDetail failed, todayBeginTime=%s", todayBeginTime)
		return
	}
	if time.Now().Day() != todayBeginTime.Day() {
		h.secondProgressDetail.SetYesterdayProgressCounter(h.secondProgressDetail.GetTodayProgressCounter())
		h.secondProgressDetail.SetTodayBeginTime(time.Now().Format(constants.TimeFormat))
		h.secondProgressDetail.SetTodayProgressCounter(common.NewTaskProgressCounter(h.secondProgressDetail.GetTodayBeginTime()))
	}

	taskProgressMap := make(map[string]*common.TaskProgressCounter)
	switch h.taskMaster.(type) {
	case taskmaster.MapTaskMaster:
		h.taskMaster.(*MapTaskMaster).GetTaskProgressMap().Range(func(key, value any) bool {
			taskProgressMap[key.(string)] = value.(*common.TaskProgressCounter)
			return true
		})
	case *BroadcastTaskMaster:
		workerProgressCounterMap := h.taskMaster.(*BroadcastTaskMaster).GetWorkerProgressMap()
		if utils.SyncMapLen(workerProgressCounterMap) == 0 {
			return
		}

		workerProgressCounterMap.Range(func(key, value any) bool {
			counter := value.(*common.WorkerProgressCounter)

			newCounter := new(common.TaskProgressCounter)
			newCounter.IncrementSuccess(counter.GetSuccess())
			newCounter.IncrementFailed(counter.GetFailed())
			newCounter.IncrementTotal(counter.GetTotal())
			taskProgressMap[counter.GetWorkerAddr()] = newCounter
			return true
		})
	case *StandaloneTaskMaster:
		ipAndPort := h.taskMaster.(*StandaloneTaskMaster).GetCurrentSelection()
		counter := common.NewTaskProgressCounter(ipAndPort)
		counter.IncrementOneTotal()
		if status == processor.InstanceStatusSucceed {
			counter.IncrementOneSuccess()
		} else {
			counter.IncrementOneFailed()
		}
		taskProgressMap[ipAndPort] = counter
	}

	if len(taskProgressMap) == 0 {
		return
	}
	history := common.NewProgressHistory()
	history.SetSerialNum(serialNum)
	history.SetStartTime(loopStartTime)
	history.SetEndTime(time.Now().UnixMilli())
	history.SetCostTime(history.EndTime() - history.StartTime())
	history.SetTaskProgressMap(taskProgressMap)
	if status == processor.InstanceStatusSucceed {
		history.SetSuccess(true)
	} else {
		history.SetSuccess(false)
	}
	h.recentProgressHistory.Enqueue(history)
}