func()

in vermeer/apps/worker/load_graph_bl.go [954:1127]


func (lb *LoadGraphBl) RecvEdge(taskId int32, worker string, count int32, end bool, endNum int32, data []byte) {
	defer func() {
		if r := recover(); r != nil {
			lb.SetStatusError(taskId, fmt.Sprintf("RecvEdge panic recover panic:%v, stack message: %s", r,
				common.GetCurrentGoroutineStack()))
			logrus.Errorf("RecvEdge panic recover taskID:%v, panic:%v, stack message: %s", taskId, r,
				common.GetCurrentGoroutineStack())
		}
	}()
	loadTask := LoadGraphMgr.GetLoadTask(taskId)
	if !lb.CheckAction(loadTask) {
		return
	}
	//defer lb.CheckAction(loadTask)
	graph := GraphMgr.GetGraphByName(loadTask.Task.SpaceName, loadTask.Task.GraphName)
	loadTask.LoadWg.Wait()
	loadTask.RecvWg.Add(1)
	logrus.Debugf("recv edge count: %d, end: %v,endNum:%v, worker: %s", count, end, endNum, worker)

	e := structure.IntEdge{}
	prop := structure.PropertyValue{}
	prop.Init(graph.Data.InEdgesPropertySchema)

	//graph.Locker.Lock()
	if graph.UseOutEdges || graph.UseOutDegree {
		//load both inEdges and outEdges
		edgeCount := 0
		rangeStart := graph.Data.VertIDStart
		rangeEnd := graph.Data.VertIDStart + graph.Data.VertexCount
		for i := 0; i < len(data); {
			n, err := e.Unmarshal(data[i:])
			if err != nil {
				lb.SetStatusError(taskId, fmt.Sprintf("load graph read edge error: %s", err))
				logrus.Errorf("load graph read edge error: %s", err)
				break
			}
			i += n
			if rangeStart <= e.Target && e.Target < rangeEnd {
				edgeCount += 1
				inIdx := e.Target - graph.Data.VertIDStart
				graph.Data.Edges.AppendInEdge(inIdx, serialize.SUint32(e.Source))
				//graph.Data.EdgeLocker[inIdx].Lock()
				//graph.Data.InEdges[inIdx] = append(graph.Data.InEdges[inIdx], serialize.SUint32(e.Source))
				//graph.Data.EdgeLocker[inIdx].UnLock()

				if graph.UseProperty {
					n, err = prop.Unmarshal(data[i:])
					if err != nil {
						lb.SetStatusError(taskId, fmt.Sprintf("load graph read edge Property error: %s", err))
						logrus.Errorf("load graph read edge Property error: %s", err)
						break
					}
					graph.Data.Edges.EdgeLockFunc(inIdx, func() {
						graph.Data.InEdgesProperty.AppendProp(prop, inIdx, graph.Data.InEdgesPropertySchema)
					})
					//graph.Data.EdgeLocker[inIdx].Lock()
					//graph.Data.InEdgesProperty.AppendProp(prop, inIdx, graph.Data.InEdgesPropertySchema)
					//graph.Data.EdgeLocker[inIdx].UnLock()
					i += n
				}
			}
			if rangeStart <= e.Source && e.Source < rangeEnd {
				if graph.UseOutDegree {
					graph.Data.Edges.AddOutDegree(e.Source, 1)
					//atomic.AddUint32((*uint32)(&graph.Data.OutDegree[e.Source]), 1)
				}
				if graph.UseOutEdges {
					outIdx := e.Source - graph.Data.VertIDStart
					graph.Data.Edges.AppendOutEdge(outIdx, serialize.SUint32(e.Target))
				}
				//graph.Data.EdgeLocker[outIdx].Lock()
				////if graph.UseUndirected {
				////	graph.Data.BothEdges[outIdx] = append(graph.Data.BothEdges[outIdx], serialize.SUint32(e.Target))
				//if graph.UseOutEdges {
				//	graph.Data.OutEdges[outIdx] = append(graph.Data.OutEdges[outIdx], serialize.SUint32(e.Target))
				//}
				//graph.Data.EdgeLocker[outIdx].UnLock()

			}
		}
		atomic.AddInt64(&graph.EdgeCount, int64(edgeCount))
	} else {
		//load inEdges only
		atomic.AddInt64(&graph.EdgeCount, int64(count))
		for i := 0; i < len(data); {
			n, err := e.Unmarshal(data[i:])
			if err != nil {
				lb.SetStatusError(taskId, fmt.Sprintf("load graph read edge error: %s", err))
				logrus.Errorf("load graph read edge error: %s", err)
				break
			}
			i += n
			eIdx := e.Target - graph.Data.VertIDStart
			if eIdx > graph.Data.VertexCount {
				logrus.Warnf("edge out of range, source:%v ,target:%v", e.Source, e.Target)
			}
			graph.Data.Edges.AppendInEdge(eIdx, serialize.SUint32(e.Source))
			//graph.Data.EdgeLocker[eIdx].Lock()
			//graph.Data.InEdges[eIdx] = append(graph.Data.InEdges[eIdx], serialize.SUint32(e.Source))
			//graph.Data.EdgeLocker[eIdx].UnLock()
			if graph.UseProperty {
				n, err = prop.Unmarshal(data[i:])
				if err != nil {
					lb.SetStatusError(taskId, fmt.Sprintf("load graph read edge Property error: %s", err))
					logrus.Errorf("load graph read edge Property error: %s", err)
					break
				}
				i += n
				graph.Data.Edges.EdgeLockFunc(eIdx, func() {
					graph.Data.InEdgesProperty.AppendProp(prop, eIdx, graph.Data.InEdgesPropertySchema)
				})
				//graph.Data.EdgeLocker[eIdx].Lock()
				//graph.Data.InEdgesProperty.AppendProp(prop, eIdx, graph.Data.InEdgesPropertySchema)
				//graph.Data.EdgeLocker[eIdx].UnLock()
			}
		}
	}
	//graph.Locker.Unlock()
	loadTask.RecvWg.Done()
	atomic.AddInt32(loadTask.RecvCount[worker], 1)

	if end {
		// wait for all messages are processed
		loadTask.RecvWg.Wait()
		for i := 0; i < 100; i++ {
			if atomic.LoadInt32(loadTask.RecvCount[worker]) >= endNum {
				break
			}
			logrus.Warnf("There are still buffer left to be processed. From worker:%v", worker)
			logrus.Debugf("recv count:%v ,end num:%v ", *loadTask.RecvCount[worker], endNum)
			time.Sleep(100 * time.Millisecond)
		}

		tarStatus := structure.TaskStateLoaded
		if graph.UseOutDegree {
			tarStatus = structure.TaskStateLoadEdgeOK
		}
		var allWorkerComplete bool
		loadTask.Locker.Lock()
		loadTask.Task.SetWorkerState(worker, tarStatus)
		allWorkerComplete = loadTask.Task.CheckTaskState(tarStatus)
		loadTask.Locker.Unlock()
		if allWorkerComplete {
			graph.OptimizeMemory()
			ctx := context.Background()
			req2 := pb.WorkerEdgeCountReq{
				TaskId:     taskId,
				WorkerName: ServiceWorker.WorkerName,
				Count:      graph.EdgeCount,
			}
			graph.Data.EdgeCount = graph.EdgeCount
			_, err := ServiceWorker.MasterClient.WorkEdgeCount(ctx, &req2)
			if err != nil {
				logrus.Errorf("WorkEdgeCount error: %s", err)
				return
			}

			loadTask.Task.SetState(tarStatus)
			req := pb.LoadTaskStatusReq{
				WorkerName: ServiceWorker.WorkerName,
				TaskId:     taskId,
				State:      string(tarStatus),
			}
			ctx = context.Background()
			_, err = ServiceWorker.MasterClient.LoadTaskStatus(ctx, &req)
			if err != nil {
				logrus.Errorf("RecvEdge send load task status error: %s", err)
			}
			for s := range loadTask.RecvCount {
				*loadTask.RecvCount[s] = 0
			}
		}
	}
}