func()

in vermeer/apps/worker/setting_bl.go [461:545]


func (sb *SettingBl) GatherOutDegree(taskID int32, workerName string, end bool, endNum int32, data []byte) {
	defer func() {
		if r := recover(); r != nil {
			sb.SetStatusError(taskID, fmt.Sprintf("GatherOutEdges panic recover panic:%v, stack message: %s", r,
				common.GetCurrentGoroutineStack()))
			logrus.Errorf("GatherOutEdges panic recover taskID:%v, panic:%v, stack message: %s", taskID, r,
				common.GetCurrentGoroutineStack())
		}
	}()
	logrus.Debugf("gather out degree worker:%v, end;%v, endnum:%v", workerName, end, endNum)
	computeTask := ComputeTaskMgr.GetTask(taskID)
	for i := 0; i < 100 && computeTask == nil; i++ {
		//wait 100ms if computeTask not init.
		logrus.Warnf("GatherOutEdges task id:%v is not available, wait 100ms", taskID)
		time.Sleep(100 * time.Millisecond)
		computeTask = ComputeTaskMgr.GetTask(taskID)
	}

	if !sb.CheckAction(computeTask) {
		return
	}
	//defer sb.CheckAction(computeTask)

	computeTask.StepWg.Wait()
	computeTask.RecvWg.Add(1)

	i := 0
	vOffSet := serialize.SUint32(0)
	if len(data) >= 4 {
		n, _ := vOffSet.Unmarshal(data)
		i += n
		var outDegree serialize.SUint32
		graph := GraphMgr.GetGraphByName(computeTask.Task.SpaceName, computeTask.Task.GraphName)
		for i < len(data) {
			n, err := outDegree.Unmarshal(data[i:])
			//n, err := graph.Data.OutDegree[int(vOffSet)].Unmarshal(data[i:])
			if err != nil {
				sb.SetStatusError(taskID, fmt.Sprintf("setting graph gather outdegree error: %sb", err))
				logrus.Errorf("setting graph gather outdegree error: %sb", err)
				break
			}
			graph.Data.Edges.AddOutDegree(uint32(vOffSet), uint32(outDegree))
			i += n
			vOffSet += 1
		}
	}

	computeTask.RecvWg.Done()
	atomic.AddInt32(computeTask.RecvCount[workerName], 1)

	if end {
		// wait for all messages are processed
		computeTask.RecvWg.Wait()
		for i := 0; i < 100; i++ {
			if atomic.LoadInt32(computeTask.RecvCount[workerName]) >= endNum {
				break
			}
			logrus.Warnf("There are still buffer left to be processed. From worker:%v", workerName)
			logrus.Debugf("recv count:%v ,end num:%v ", *computeTask.RecvCount[workerName], endNum)
			time.Sleep(100 * time.Millisecond)
		}

		var allWorkerComplete bool
		computeTask.Locker.Lock()
		state := structure.TaskStateSettingOutDegreeOK
		computeTask.Task.SetWorkerState(workerName, state)
		allWorkerComplete = computeTask.Task.CheckTaskState(state)
		computeTask.Locker.Unlock()
		if allWorkerComplete {
			//loadTask.Task.SetWorkerState(workerName, structure.TaskStateLoaded)
			//if loadTask.Task.CheckTaskState(structure.TaskStateLoaded) {
			computeTask.Task.SetState(state)
			req := pb.SettingGraphReq{
				WorkerName: ServiceWorker.WorkerName,
				TaskId:     taskID,
				State:      string(state),
			}
			ctx := context.Background()
			_, err := ServiceWorker.MasterClient.SettingGraph(ctx, &req)
			if err != nil {
				logrus.Errorf("LoadTaskStatus error: %sb", err)
			}
		}
	}
}