in vermeer/apps/worker/load_graph_bl.go [954:1127]
func (lb *LoadGraphBl) RecvEdge(taskId int32, worker string, count int32, end bool, endNum int32, data []byte) {
defer func() {
if r := recover(); r != nil {
lb.SetStatusError(taskId, fmt.Sprintf("RecvEdge panic recover panic:%v, stack message: %s", r,
common.GetCurrentGoroutineStack()))
logrus.Errorf("RecvEdge panic recover taskID:%v, panic:%v, stack message: %s", taskId, r,
common.GetCurrentGoroutineStack())
}
}()
loadTask := LoadGraphMgr.GetLoadTask(taskId)
if !lb.CheckAction(loadTask) {
return
}
//defer lb.CheckAction(loadTask)
graph := GraphMgr.GetGraphByName(loadTask.Task.SpaceName, loadTask.Task.GraphName)
loadTask.LoadWg.Wait()
loadTask.RecvWg.Add(1)
logrus.Debugf("recv edge count: %d, end: %v,endNum:%v, worker: %s", count, end, endNum, worker)
e := structure.IntEdge{}
prop := structure.PropertyValue{}
prop.Init(graph.Data.InEdgesPropertySchema)
//graph.Locker.Lock()
if graph.UseOutEdges || graph.UseOutDegree {
//load both inEdges and outEdges
edgeCount := 0
rangeStart := graph.Data.VertIDStart
rangeEnd := graph.Data.VertIDStart + graph.Data.VertexCount
for i := 0; i < len(data); {
n, err := e.Unmarshal(data[i:])
if err != nil {
lb.SetStatusError(taskId, fmt.Sprintf("load graph read edge error: %s", err))
logrus.Errorf("load graph read edge error: %s", err)
break
}
i += n
if rangeStart <= e.Target && e.Target < rangeEnd {
edgeCount += 1
inIdx := e.Target - graph.Data.VertIDStart
graph.Data.Edges.AppendInEdge(inIdx, serialize.SUint32(e.Source))
//graph.Data.EdgeLocker[inIdx].Lock()
//graph.Data.InEdges[inIdx] = append(graph.Data.InEdges[inIdx], serialize.SUint32(e.Source))
//graph.Data.EdgeLocker[inIdx].UnLock()
if graph.UseProperty {
n, err = prop.Unmarshal(data[i:])
if err != nil {
lb.SetStatusError(taskId, fmt.Sprintf("load graph read edge Property error: %s", err))
logrus.Errorf("load graph read edge Property error: %s", err)
break
}
graph.Data.Edges.EdgeLockFunc(inIdx, func() {
graph.Data.InEdgesProperty.AppendProp(prop, inIdx, graph.Data.InEdgesPropertySchema)
})
//graph.Data.EdgeLocker[inIdx].Lock()
//graph.Data.InEdgesProperty.AppendProp(prop, inIdx, graph.Data.InEdgesPropertySchema)
//graph.Data.EdgeLocker[inIdx].UnLock()
i += n
}
}
if rangeStart <= e.Source && e.Source < rangeEnd {
if graph.UseOutDegree {
graph.Data.Edges.AddOutDegree(e.Source, 1)
//atomic.AddUint32((*uint32)(&graph.Data.OutDegree[e.Source]), 1)
}
if graph.UseOutEdges {
outIdx := e.Source - graph.Data.VertIDStart
graph.Data.Edges.AppendOutEdge(outIdx, serialize.SUint32(e.Target))
}
//graph.Data.EdgeLocker[outIdx].Lock()
////if graph.UseUndirected {
//// graph.Data.BothEdges[outIdx] = append(graph.Data.BothEdges[outIdx], serialize.SUint32(e.Target))
//if graph.UseOutEdges {
// graph.Data.OutEdges[outIdx] = append(graph.Data.OutEdges[outIdx], serialize.SUint32(e.Target))
//}
//graph.Data.EdgeLocker[outIdx].UnLock()
}
}
atomic.AddInt64(&graph.EdgeCount, int64(edgeCount))
} else {
//load inEdges only
atomic.AddInt64(&graph.EdgeCount, int64(count))
for i := 0; i < len(data); {
n, err := e.Unmarshal(data[i:])
if err != nil {
lb.SetStatusError(taskId, fmt.Sprintf("load graph read edge error: %s", err))
logrus.Errorf("load graph read edge error: %s", err)
break
}
i += n
eIdx := e.Target - graph.Data.VertIDStart
if eIdx > graph.Data.VertexCount {
logrus.Warnf("edge out of range, source:%v ,target:%v", e.Source, e.Target)
}
graph.Data.Edges.AppendInEdge(eIdx, serialize.SUint32(e.Source))
//graph.Data.EdgeLocker[eIdx].Lock()
//graph.Data.InEdges[eIdx] = append(graph.Data.InEdges[eIdx], serialize.SUint32(e.Source))
//graph.Data.EdgeLocker[eIdx].UnLock()
if graph.UseProperty {
n, err = prop.Unmarshal(data[i:])
if err != nil {
lb.SetStatusError(taskId, fmt.Sprintf("load graph read edge Property error: %s", err))
logrus.Errorf("load graph read edge Property error: %s", err)
break
}
i += n
graph.Data.Edges.EdgeLockFunc(eIdx, func() {
graph.Data.InEdgesProperty.AppendProp(prop, eIdx, graph.Data.InEdgesPropertySchema)
})
//graph.Data.EdgeLocker[eIdx].Lock()
//graph.Data.InEdgesProperty.AppendProp(prop, eIdx, graph.Data.InEdgesPropertySchema)
//graph.Data.EdgeLocker[eIdx].UnLock()
}
}
}
//graph.Locker.Unlock()
loadTask.RecvWg.Done()
atomic.AddInt32(loadTask.RecvCount[worker], 1)
if end {
// wait for all messages are processed
loadTask.RecvWg.Wait()
for i := 0; i < 100; i++ {
if atomic.LoadInt32(loadTask.RecvCount[worker]) >= endNum {
break
}
logrus.Warnf("There are still buffer left to be processed. From worker:%v", worker)
logrus.Debugf("recv count:%v ,end num:%v ", *loadTask.RecvCount[worker], endNum)
time.Sleep(100 * time.Millisecond)
}
tarStatus := structure.TaskStateLoaded
if graph.UseOutDegree {
tarStatus = structure.TaskStateLoadEdgeOK
}
var allWorkerComplete bool
loadTask.Locker.Lock()
loadTask.Task.SetWorkerState(worker, tarStatus)
allWorkerComplete = loadTask.Task.CheckTaskState(tarStatus)
loadTask.Locker.Unlock()
if allWorkerComplete {
graph.OptimizeMemory()
ctx := context.Background()
req2 := pb.WorkerEdgeCountReq{
TaskId: taskId,
WorkerName: ServiceWorker.WorkerName,
Count: graph.EdgeCount,
}
graph.Data.EdgeCount = graph.EdgeCount
_, err := ServiceWorker.MasterClient.WorkEdgeCount(ctx, &req2)
if err != nil {
logrus.Errorf("WorkEdgeCount error: %s", err)
return
}
loadTask.Task.SetState(tarStatus)
req := pb.LoadTaskStatusReq{
WorkerName: ServiceWorker.WorkerName,
TaskId: taskId,
State: string(tarStatus),
}
ctx = context.Background()
_, err = ServiceWorker.MasterClient.LoadTaskStatus(ctx, &req)
if err != nil {
logrus.Errorf("RecvEdge send load task status error: %s", err)
}
for s := range loadTask.RecvCount {
*loadTask.RecvCount[s] = 0
}
}
}
}