in internal/master/second_job_update_instance_status_handler.go [288:362]
func (h *secondJobUpdateInstanceStatusHandler) setHistory(serialNum int64, loopStartTime int64, status processor.InstanceStatus) {
if status == processor.InstanceStatusSucceed {
h.secondProgressDetail.GetTodayProgressCounter().IncrementOneSuccess()
} else {
h.secondProgressDetail.GetTodayProgressCounter().IncrementOneFailed()
}
if !h.taskMaster.IsKilled() {
h.secondProgressDetail.GetTodayProgressCounter().IncrementRunning()
h.secondProgressDetail.GetTodayProgressCounter().IncrementOneTotal()
}
// reset today progress counter
todayBeginTime, err := time.Parse(constants.TimeFormat, h.secondProgressDetail.GetTodayBeginTime())
if err != nil {
logger.Errorf("setHistory failed, getTodayBeginTime from secondProgressDetail failed, todayBeginTime=%s", todayBeginTime)
return
}
if time.Now().Day() != todayBeginTime.Day() {
h.secondProgressDetail.SetYesterdayProgressCounter(h.secondProgressDetail.GetTodayProgressCounter())
h.secondProgressDetail.SetTodayBeginTime(time.Now().Format(constants.TimeFormat))
h.secondProgressDetail.SetTodayProgressCounter(common.NewTaskProgressCounter(h.secondProgressDetail.GetTodayBeginTime()))
}
taskProgressMap := make(map[string]*common.TaskProgressCounter)
switch h.taskMaster.(type) {
case taskmaster.MapTaskMaster:
h.taskMaster.(*MapTaskMaster).GetTaskProgressMap().Range(func(key, value any) bool {
taskProgressMap[key.(string)] = value.(*common.TaskProgressCounter)
return true
})
case *BroadcastTaskMaster:
workerProgressCounterMap := h.taskMaster.(*BroadcastTaskMaster).GetWorkerProgressMap()
if utils.SyncMapLen(workerProgressCounterMap) == 0 {
return
}
workerProgressCounterMap.Range(func(key, value any) bool {
counter := value.(*common.WorkerProgressCounter)
newCounter := new(common.TaskProgressCounter)
newCounter.IncrementSuccess(counter.GetSuccess())
newCounter.IncrementFailed(counter.GetFailed())
newCounter.IncrementTotal(counter.GetTotal())
taskProgressMap[counter.GetWorkerAddr()] = newCounter
return true
})
case *StandaloneTaskMaster:
ipAndPort := h.taskMaster.(*StandaloneTaskMaster).GetCurrentSelection()
counter := common.NewTaskProgressCounter(ipAndPort)
counter.IncrementOneTotal()
if status == processor.InstanceStatusSucceed {
counter.IncrementOneSuccess()
} else {
counter.IncrementOneFailed()
}
taskProgressMap[ipAndPort] = counter
}
if len(taskProgressMap) == 0 {
return
}
history := common.NewProgressHistory()
history.SetSerialNum(serialNum)
history.SetStartTime(loopStartTime)
history.SetEndTime(time.Now().UnixMilli())
history.SetCostTime(history.EndTime() - history.StartTime())
history.SetTaskProgressMap(taskProgressMap)
if status == processor.InstanceStatusSucceed {
history.SetSuccess(true)
} else {
history.SetSuccess(false)
}
h.recentProgressHistory.Enqueue(history)
}