func()

in vermeer/apps/worker/load_graph_bl.go [849:952]


func (lb *LoadGraphBl) RecvVertex(taskId int32, worker string, count int32, end bool, endNum int32, data []byte) {
	defer func() {
		if r := recover(); r != nil {
			lb.SetStatusError(taskId, fmt.Sprintf("RecvVertex panic recover panic:%v, stack message: %s", r,
				common.GetCurrentGoroutineStack()))
			logrus.Errorf("RecvVertex panic recover taskID:%v, panic:%v, stack message: %s", taskId, r,
				common.GetCurrentGoroutineStack())
		}
	}()
	loadTask := LoadGraphMgr.GetLoadTask(taskId)
	for i := 0; i < 100 && loadTask == nil; i++ {
		//wait 100ms if loadTask not init.
		logrus.Warnf("task id:%v is not available, wait 100ms", taskId)
		time.Sleep(100 * time.Millisecond)
		loadTask = LoadGraphMgr.GetLoadTask(taskId)
	}
	if !lb.CheckAction(loadTask) {
		return
	}
	loadTask.RecvWg.Add(1)
	loadTask.LoadWg.Wait()
	//defer lb.CheckAction(loadTask)
	graph := GraphMgr.GetGraphByName(loadTask.Task.SpaceName, loadTask.Task.GraphName)
	atomic.AddUint32(&graph.Data.VertexCount, uint32(count))
	logrus.Debugf("recv vertex count: %d, end: %v,endNum:%v, worker: %s", count, end, endNum, worker)

	vertexList := make([]structure.Vertex, count)

	properties := structure.VertexProperties{}
	properties.Init(graph.Data.VertexPropertySchema)
	prop := structure.PropertyValue{}
	prop.Init(graph.Data.VertexPropertySchema)

	c := 0
	for i := 0; i < len(data); {
		n, err := vertexList[c].Unmarshal(data[i:])
		if err != nil {
			lb.SetStatusError(taskId, fmt.Sprintf("load graph read vertex error: %s", err))
			logrus.Errorf("load graph read vertex error: %s", err)
			break
		}
		i += n
		if graph.UseProperty {
			n, err = prop.Unmarshal(data[i:])
			if err != nil {
				lb.SetStatusError(taskId, fmt.Sprintf("load graph read vertex prop error: %s", err))
				logrus.Errorf("load graph read vertex prop error: %s", err)
				break
			}
			properties.AppendProp(prop, graph.Data.VertexPropertySchema)
			i += n
		}
		c += 1
	}
	if c != int(count) {
		lb.SetStatusError(taskId, fmt.Sprintf("RecvVertex count incorrect: %d, %d", c, count))
		logrus.Errorf("RecvVertex count incorrect: %d, %d", c, count)
	}
	graph.Locker.Lock()
	graph.Data.Vertex.AppendVertices(vertexList...)
	if graph.UseProperty {
		graph.Data.VertexProperty.AppendProps(properties)
	}
	graph.Locker.Unlock()

	loadTask.RecvWg.Done()
	atomic.AddInt32(loadTask.RecvCount[worker], 1)

	if end {
		// wait for all messages are processed
		loadTask.RecvWg.Wait()
		for i := 0; i < 100; i++ {
			if atomic.LoadInt32(loadTask.RecvCount[worker]) >= endNum {
				break
			}
			logrus.Warnf("There are still buffer left to be processed. From worker:%v", worker)
			logrus.Debugf("recv count:%v ,end num:%v ", *loadTask.RecvCount[worker], endNum)
			time.Sleep(100 * time.Millisecond)
		}
		var allWorkerComplete bool
		loadTask.Locker.Lock()
		loadTask.Task.SetWorkerState(worker, structure.TaskStateLoadVertexOK)
		allWorkerComplete = loadTask.Task.CheckTaskState(structure.TaskStateLoadVertexOK)
		loadTask.Locker.Unlock()
		if allWorkerComplete {
			loadTask.Task.SetState(structure.TaskStateLoadVertexOK)
			loadTask.LoadWg.Add(1)
			lb.OnVertexLoaded(taskId)
			req := pb.LoadTaskStatusReq{
				WorkerName: ServiceWorker.WorkerName,
				TaskId:     taskId,
				State:      string(structure.TaskStateLoadVertexOK),
			}
			ctx := context.Background()
			_, err := ServiceWorker.MasterClient.LoadTaskStatus(ctx, &req)
			if err != nil {
				logrus.Errorf("RecvVertex send load task status error: %s", err)
			}
			for s := range loadTask.RecvCount {
				*loadTask.RecvCount[s] = 0
			}
		}
	}
}