in vermeer/apps/worker/load_graph_bl.go [179:369]
func (lb *LoadGraphBl) ScatterVertex(taskID int32) {
defer func() {
if r := recover(); r != nil {
lb.SetStatusError(taskID, fmt.Sprintf("ScatterVertex panic recover panic:%v, stack message: %s", r,
common.GetCurrentGoroutineStack()))
logrus.Errorf("ScatterVertex panic recover taskID:%v, panic:%v, stack message: %s", taskID, r,
common.GetCurrentGoroutineStack())
}
}()
loadTask := LoadGraphMgr.GetLoadTask(taskID)
if !lb.CheckAction(loadTask) {
return
}
//defer lb.CheckAction(loadTask)
graph := GraphMgr.GetGraphByName(loadTask.Task.SpaceName, loadTask.Task.GraphName)
ctx := context.Background()
req := pb.GetGraphWorkersReq{
GraphName: loadTask.Task.GraphName,
SpaceName: loadTask.Task.SpaceName,
}
resp, err := ServiceWorker.MasterClient.GetGraphWorkers(ctx, &req)
if err != nil {
lb.SetStatusError(taskID, fmt.Sprintf("GetGraphWorkers error: %s", err))
logrus.Errorf("GetGraphWorkers error: %s", err)
return
}
for _, w := range resp.Workers {
graph.SetWorkerVertexCount(w.Name, w.VertexCount, w.VertIdStart)
}
workerCount := len(loadTask.Task.Workers)
graph.RecastVertex()
loadTask.LoadWg.Done()
peers := make([]*PeerClient, 0, workerCount-1)
for _, wn := range loadTask.Task.Workers {
if wn.Name == ServiceWorker.WorkerName {
lb.GatherVertex(
loadTask.Task.ID,
wn.Name,
true,
1,
[]byte{})
continue
}
peers = append(peers, PeerMgr.GetPeer(wn.Name))
}
// skip if only on worker
if len(peers) == 0 {
return
}
// sendBuffer := buffer.EncodeBuffer{}
// sendBuffer.Init(BufferSize)
localGw := graph.GetGraphWorker(ServiceWorker.WorkerName)
parallel := options.GetInt(loadTask.Task.Params, "load.parallel")
if parallel <= 0 {
logrus.Infof("load.parallel value must be larger than 0, get: %v, set to defalut value :1", parallel)
parallel = 1
} else if parallel > 10 {
parallel = 10
}
sendBuffers := make([]buffer.EncodeBuffer, parallel)
for i := range sendBuffers {
sendBuffers[i] = buffer.EncodeBuffer{}
sendBuffers[i].Init(BufferSize)
}
partCnt := int(localGw.VertexCount)/parallel + 1
wg := &sync.WaitGroup{}
for i := 0; i < parallel; i++ {
wg.Add(1)
go func(pID int) {
defer func() {
if r := recover(); r != nil {
lb.SetStatusError(taskID, fmt.Sprintf("ScatterVertex panic recover panic:%v, stack message: %s",
r, common.GetCurrentGoroutineStack()))
logrus.Errorf("ScatterVertex panic recover taskID:%v, pId:%v panic:%v, stack message: %s",
taskID, pID, r, common.GetCurrentGoroutineStack())
}
}()
defer wg.Done()
bIdx := uint32(partCnt*pID) + localGw.VertIdStart
eIdx := bIdx + uint32(partCnt)
if eIdx > localGw.VertIdStart+localGw.VertexCount {
eIdx = localGw.VertIdStart + localGw.VertexCount
}
vOffSet := serialize.SUint32(bIdx)
_ = sendBuffers[pID].Marshal(&vOffSet)
for j := bIdx; j < eIdx; j++ {
vertex := graph.Data.Vertex.GetVertex(j)
_ = sendBuffers[pID].Marshal(&vertex)
if graph.UseProperty {
for _, k := range graph.Data.VertexPropertySchema.Schema {
value := graph.Data.VertexProperty.GetValue(k.PropKey, j)
_ = sendBuffers[pID].Marshal(value)
}
}
if sendBuffers[pID].Full() {
count := int32(sendBuffers[pID].ObjCount())
if graph.UseProperty {
count = int32(sendBuffers[pID].ObjCount() / (len(graph.Data.VertexPropertySchema.Schema) + 1))
}
for _, peer := range peers {
atomic.AddInt32(loadTask.SendCount[peer.Name], 1)
peer.LoadActionHandler.LoadAction(
loadTask.Task.ID,
pb.LoadAction_LoadScatter,
count,
false,
0,
sendBuffers[pID].PayLoad())
}
sendBuffers[pID].Reset()
vOffSet = serialize.SUint32(j + 1)
_ = sendBuffers[pID].Marshal(&vOffSet)
}
}
count := int32(sendBuffers[pID].ObjCount())
if graph.UseProperty {
count = int32(sendBuffers[pID].ObjCount() / (len(graph.Data.VertexPropertySchema.Schema) + 1))
}
for _, peer := range peers {
atomic.AddInt32(loadTask.SendCount[peer.Name], 1)
peer.LoadActionHandler.LoadAction(
loadTask.Task.ID,
pb.LoadAction_LoadScatter,
count,
false,
0,
sendBuffers[pID].PayLoad())
}
sendBuffers[pID].Reset()
}(i)
}
wg.Wait()
// vOffSet := serialize.SUint32(localGw.VertIdStart)
// _ = sendBuffer.Marshal(&vOffSet)
// for i := localGw.VertIdStart; i < localGw.VertIdStart+localGw.VertexCount; i++ {
// vertex := graph.Data.Vertex.GetVertex(i)
// _ = sendBuffer.Marshal(&vertex)
// if graph.UseProperty {
// for _, k := range graph.Data.VertexPropertySchema.Schema {
// value := graph.Data.VertexProperty.GetValue(k.PropKey, i)
// _ = sendBuffer.Marshal(value)
// }
// }
// if sendBuffer.Full() {
// count := int32(sendBuffer.ObjCount())
// if graph.UseProperty {
// count = int32(sendBuffer.ObjCount() / (len(graph.Data.VertexPropertySchema.Schema) + 1))
// }
// for _, peer := range peers {
// peer.LoadActionHandler.LoadAction(
// loadTask.Task.ID,
// pb.LoadAction_LoadScatter,
// count,
// false,
// 0,
// sendBuffer.PayLoad())
// }
// sendBuffer.Reset()
// vOffSet = serialize.SUint32(i + 1)
// _ = sendBuffer.Marshal(&vOffSet)
// }
// }
// count := int32(sendBuffer.ObjCount())
// if graph.UseProperty {
// count = int32(sendBuffer.ObjCount() / (len(graph.Data.VertexPropertySchema.Schema) + 1))
// }
for _, peer := range peers {
atomic.AddInt32(loadTask.SendCount[peer.Name], 1)
peer.LoadActionHandler.LoadAction(
loadTask.Task.ID,
pb.LoadAction_LoadScatter,
0,
true,
atomic.LoadInt32(loadTask.SendCount[peer.Name]),
[]byte{})
}
// sendBuffer.Reset()
for s := range loadTask.SendCount {
*loadTask.SendCount[s] = 0
}
}