in vermeer/apps/worker/load_graph_bl.go [662:843]
func (lb *LoadGraphBl) RunLoadEdge(ctx context.Context, loadTask *graphio.LoadGraphTask) {
defer func() {
if r := recover(); r != nil {
lb.SetStatusError(loadTask.Task.ID, fmt.Sprintf("RunLoadEdge panic recover panic:%v, stack message: %s",
r, common.GetCurrentGoroutineStack()))
logrus.Errorf("RunLoadEdge panic recover taskID:%v, panic:%v, stack message: %s",
loadTask.Task.ID, r, common.GetCurrentGoroutineStack())
}
}()
if !lb.CheckAction(loadTask) {
return
}
//defer lb.CheckAction(loadTask)
workerCount := len(loadTask.Task.Workers)
sendBuffers := make([]buffer.EncodeBuffer, 0, workerCount)
peers := make([]*PeerClient, 0, workerCount)
for _, v := range loadTask.Task.Workers {
peers = append(peers, PeerMgr.GetPeer(v.Name))
buf := buffer.EncodeBuffer{}
buf.Init(BufferSize)
sendBuffers = append(sendBuffers, buf)
}
graph := GraphMgr.GetGraphByName(loadTask.Task.SpaceName, loadTask.Task.GraphName)
edge := structure.Edge{}
intEdge := structure.IntEdge{}
property := structure.PropertyValue{}
property.Init(graph.Data.InEdgesPropertySchema)
for {
if !lb.CheckAction(loadTask) {
return
}
reqCtx := context.Background()
req := pb.FetchLoadPartReq{}
req.TaskId = loadTask.Task.ID
req.WorkerName = ServiceWorker.WorkerName
resp, err := ServiceWorker.MasterClient.FetchLoadPart(reqCtx, &req)
if err != nil {
logrus.Errorf("RunLoadEdge fetch partition error: %s", err)
break
}
if resp.PartId == 0 {
logrus.Infof("RunLoadEdge fetch part eof, worker: %d", ctx.Value("worker_id"))
break
}
loader := graphio.MakeLoader(loadTask.LoadType)
err = loader.Init(resp.Params, graph.Data.InEdgesPropertySchema)
if err != nil {
lb.SetStatusError(loadTask.Task.ID, fmt.Sprintf("graph loader init error: %s", err))
logrus.Errorf("graph loader init error: %s", err)
loader.Close()
return
}
logrus.Infof("start read part: %s", loader.Name())
for {
if !lb.CheckAction(loadTask) {
return
}
err = loader.ReadEdge(&edge, &property)
if err != nil {
if err == io.EOF {
logrus.Infof("read part eof: %s, count: %d", loader.Name(), loader.ReadCount())
break
}
lb.SetStatusError(loadTask.Task.ID, fmt.Sprintf("read edge error: %s", err))
logrus.Errorf("read name:%v edge error: %s", loader.Name(), err)
loader.Close()
return
}
var ok bool
intEdge.Source, ok = graph.Data.Vertex.GetVertexIndex(edge.Source)
if !ok {
continue
}
intEdge.Target, ok = graph.Data.Vertex.GetVertexIndex(edge.Target)
if !ok {
continue
}
workerIdx := common.HashBKDR(edge.Target) % workerCount
_ = sendBuffers[workerIdx].Marshal(&intEdge)
if graph.UseProperty {
_ = sendBuffers[workerIdx].Marshal(&property)
}
if sendBuffers[workerIdx].Full() {
atomic.AddInt32(loadTask.SendCount[peers[workerIdx].Name], 1)
count := int32(sendBuffers[workerIdx].ObjCount())
if graph.UseProperty {
count = int32(sendBuffers[workerIdx].ObjCount() / 2)
}
if peers[workerIdx].Self {
lb.RecvEdge(
loadTask.Task.ID,
peers[workerIdx].Name,
count,
false,
0,
sendBuffers[workerIdx].PayLoad())
} else {
peers[workerIdx].LoadActionHandler.LoadAction(
loadTask.Task.ID,
pb.LoadAction_LoadEdge,
count,
false,
0,
sendBuffers[workerIdx].PayLoad())
}
sendBuffers[workerIdx].Reset()
}
//If the workerId of source and target are the same, sent only once.
//If they are not the same, send an edge for both workerId's.
if graph.UseOutEdges || graph.UseOutDegree {
workerIdxOut := common.HashBKDR(edge.Source) % workerCount
if workerIdxOut == workerIdx {
continue
}
_ = sendBuffers[workerIdxOut].Marshal(&intEdge)
if sendBuffers[workerIdxOut].Full() {
atomic.AddInt32(loadTask.SendCount[peers[workerIdxOut].Name], 1)
if peers[workerIdxOut].Self {
lb.RecvEdge(
loadTask.Task.ID,
peers[workerIdxOut].Name,
int32(sendBuffers[workerIdxOut].ObjCount()),
false,
0,
sendBuffers[workerIdxOut].PayLoad())
} else {
peers[workerIdxOut].LoadActionHandler.LoadAction(
loadTask.Task.ID,
pb.LoadAction_LoadEdge,
int32(sendBuffers[workerIdxOut].ObjCount()),
false,
0,
sendBuffers[workerIdxOut].PayLoad())
}
sendBuffers[workerIdxOut].Reset()
}
}
}
loader.Close()
}
loadTask.Locker.Lock()
atomic.AddInt32(loadTask.Parallel, -1)
end := atomic.LoadInt32(loadTask.Parallel) == 0
for i := range sendBuffers {
atomic.AddInt32(loadTask.SendCount[peers[i].Name], 1)
}
loadTask.Locker.Unlock()
for i := range sendBuffers {
count := int32(sendBuffers[i].ObjCount())
if graph.UseProperty {
count = int32(sendBuffers[i].ObjCount() / 2)
}
if peers[i].Self {
lb.RecvEdge(
loadTask.Task.ID,
peers[i].Name,
count,
end,
atomic.LoadInt32(loadTask.SendCount[peers[i].Name]),
sendBuffers[i].PayLoad())
} else {
peers[i].LoadActionHandler.LoadAction(
loadTask.Task.ID,
pb.LoadAction_LoadEdge,
count,
end,
atomic.LoadInt32(loadTask.SendCount[peers[i].Name]),
sendBuffers[i].PayLoad())
}
sendBuffers[i].Reset()
}
if end {
for s := range loadTask.SendCount {
*loadTask.SendCount[s] = 0
}
}
}