func()

in vermeer/apps/worker/load_graph_bl.go [371:489]


func (lb *LoadGraphBl) GatherVertex(taskID int32, workerName string, end bool, endNum int32, data []byte) {
	defer func() {
		if r := recover(); r != nil {
			lb.SetStatusError(taskID, fmt.Sprintf("GatherVertex panic recover panic:%v, stack message: %s", r,
				common.GetCurrentGoroutineStack()))
			logrus.Errorf("GatherVertex panic recover taskID:%v, panic:%v, stack message: %s", taskID, r,
				common.GetCurrentGoroutineStack())
		}
	}()
	logrus.Debugf("gather vertex worker:%v, end:%v", workerName, end)
	loadTask := LoadGraphMgr.GetLoadTask(taskID)
	if !lb.CheckAction(loadTask) {
		return
	}
	//defer lb.CheckAction(loadTask)
	graph := GraphMgr.GetGraphByName(loadTask.Task.SpaceName, loadTask.Task.GraphName)
	loadTask.LoadWg.Wait()
	loadTask.RecvWg.Add(1)
	//gw := graph.GetGraphWorker(workerName)

	i := 0
	vOffSet := serialize.SUint32(0)
	if len(data) >= 4 {
		n, _ := vOffSet.Unmarshal(data)
		i += n
	}

	vOffSetStart := vOffSet
	vertexList := make([]structure.Vertex, 0, 1000)
	for i < len(data) {
		var vertex structure.Vertex
		n, err := vertex.Unmarshal(data[i:])
		if err != nil {
			lb.SetStatusError(taskID, fmt.Sprintf("load graph read vertex error: %s", err))
			logrus.Errorf("load graph read vertex error: %s", err)
			break
		}
		vertexList = append(vertexList, vertex)
		// graph.Data.Vertex.SetVertex(uint32(vOffSet), vertex)
		//n, err := graph.Data.TotalVertex[int(vOffSet)].Unmarshal(data[i:])
		//if err != nil {
		//	lb.SetStatusError(taskId, fmt.Sprintf("load graph read vertex error: %s", err))
		//	logrus.Errorf("load graph read vertex error: %s", err)
		//	break
		//}
		i += n
		if graph.UseProperty {
			var value serialize.MarshalAble
			for _, k := range graph.Data.VertexPropertySchema.Schema {
				switch k.VType {
				case structure.ValueTypeInt32:
					var sInt32 serialize.SInt32
					n, err = sInt32.Unmarshal(data[i:])
					value = &sInt32
				case structure.ValueTypeFloat32:
					var sFloat32 serialize.SFloat32
					n, err = sFloat32.Unmarshal(data[i:])
					value = &sFloat32
				case structure.ValueTypeString:
					var sString serialize.SString
					n, err = sString.Unmarshal(data[i:])
					value = &sString
				}
				if err != nil {
					lb.SetStatusError(taskID, fmt.Sprintf("GatherVertex vertex property error: %s", err))
					logrus.Errorf("GatherVertex vertex property error: %s", err)
					break
				}
				graph.Data.VertexProperty.SetValue(k.PropKey, uint32(vOffSet), value)
				i += n
			}
		}

		vOffSet += 1
	}
	graph.Data.Vertex.SetVertices(uint32(vOffSetStart), vertexList...)
	//logrus.Infof("GatherVertex offset: %d, worker: %s, end: %v", vOffSet, workerName, end)
	loadTask.RecvWg.Done()
	atomic.AddInt32(loadTask.RecvCount[workerName], 1)

	if end {
		// wait for all messages are processed
		loadTask.RecvWg.Wait()
		for i := 0; i < 100; i++ {
			if atomic.LoadInt32(loadTask.RecvCount[workerName]) >= endNum {
				break
			}
			logrus.Warnf("There are still buffer left to be processed. From worker:%v", workerName)
			logrus.Debugf("recv count:%v ,end num:%v ", *loadTask.RecvCount[workerName], endNum)
			time.Sleep(100 * time.Millisecond)
		}
		var allWorkerComplete bool
		loadTask.Locker.Lock()
		loadTask.Task.SetWorkerState(workerName, structure.TaskStateLoadScatterOK)
		allWorkerComplete = loadTask.Task.CheckTaskState(structure.TaskStateLoadScatterOK)
		loadTask.Locker.Unlock()
		if allWorkerComplete {
			logrus.Infof("Gather vertex complete, task:%v ", taskID)
			//loadTask.Task.SetWorkerState(workerName, structure.TaskStateLoadScatterOK)
			//if loadTask.Task.CheckTaskState(structure.TaskStateLoadScatterOK) {
			loadTask.Task.SetState(structure.TaskStateLoadScatterOK)
			loadTask.LoadWg.Add(1)
			graph.BuildTotalVertex()
			req := pb.LoadTaskStatusReq{
				WorkerName: ServiceWorker.WorkerName,
				TaskId:     taskID,
				State:      string(structure.TaskStateLoadScatterOK),
			}
			ctx := context.Background()
			_, err := ServiceWorker.MasterClient.LoadTaskStatus(ctx, &req)
			if err != nil {
				logrus.Errorf("LoadTaskStatus error: %s", err)
			}
			for s := range loadTask.RecvCount {
				*loadTask.RecvCount[s] = 0
			}
		}
	}
}