in vermeer/apps/worker/load_graph_bl.go [521:660]
func (lb *LoadGraphBl) RunLoadVertex(ctx context.Context, loadTask *graphio.LoadGraphTask) {
defer func() {
if r := recover(); r != nil {
lb.SetStatusError(loadTask.Task.ID, fmt.Sprintf("RunLoadVertex panic recover panic:%v, stack message: %s",
r, common.GetCurrentGoroutineStack()))
logrus.Errorf("RunLoadVertex panic recover taskID:%v, panic:%v, stack message: %s",
loadTask.Task.ID, r, common.GetCurrentGoroutineStack())
}
}()
if !lb.CheckAction(loadTask) {
return
}
//defer lb.CheckAction(loadTask)
workerCount := len(loadTask.Task.Workers)
sendBuffers := make([]buffer.EncodeBuffer, 0, workerCount)
peers := make([]*PeerClient, 0, workerCount)
graph := GraphMgr.GetGraphByName(loadTask.Task.SpaceName, loadTask.Task.GraphName)
for _, v := range loadTask.Task.Workers {
peers = append(peers, PeerMgr.GetPeer(v.Name))
buf := buffer.EncodeBuffer{}
buf.Init(BufferSize)
sendBuffers = append(sendBuffers, buf)
}
vertex := structure.Vertex{}
property := structure.PropertyValue{}
property.Init(graph.Data.VertexPropertySchema)
for {
if !lb.CheckAction(loadTask) {
return
}
reqCtx := context.Background()
req := pb.FetchLoadPartReq{}
req.TaskId = loadTask.Task.ID
req.WorkerName = ServiceWorker.WorkerName
resp, err := ServiceWorker.MasterClient.FetchLoadPart(reqCtx, &req)
if err != nil {
logrus.Errorf("RunLoadVertex fetch partition error: %s", err)
break
}
if resp.PartId == 0 {
logrus.Infof("RunLoadVertex fetch part eof, worker: %d", ctx.Value("worker_id"))
break
}
loader := graphio.MakeLoader(loadTask.LoadType)
err = loader.Init(resp.Params, graph.Data.VertexPropertySchema)
if err != nil {
lb.SetStatusError(loadTask.Task.ID, fmt.Sprintf("graph loader init error: %s", err))
logrus.Errorf("graph loader init error: %s", err)
loader.Close()
return
}
logrus.Infof("start read part: %s", loader.Name())
for {
err = loader.ReadVertex(&vertex, &property)
if err != nil {
if err == io.EOF {
logrus.Infof("read part eof: %s, count: %d", loader.Name(), loader.ReadCount())
break
}
lb.SetStatusError(loadTask.Task.ID, fmt.Sprintf("read vertex error: %s", err))
logrus.Errorf("read vertex error: %s", err)
loader.Close()
return
}
workerIdx := common.HashBKDR(vertex.ID) % workerCount
_ = sendBuffers[workerIdx].Marshal(&vertex)
if graph.UseProperty {
_ = sendBuffers[workerIdx].Marshal(&property)
}
if sendBuffers[workerIdx].Full() {
atomic.AddInt32(loadTask.SendCount[peers[workerIdx].Name], 1)
count := int32(sendBuffers[workerIdx].ObjCount())
if graph.UseProperty {
count = int32(sendBuffers[workerIdx].ObjCount() / 2)
}
if peers[workerIdx].Self {
lb.RecvVertex(
loadTask.Task.ID,
peers[workerIdx].Name,
count,
false,
0,
sendBuffers[workerIdx].PayLoad())
} else {
peers[workerIdx].LoadActionHandler.LoadAction(
loadTask.Task.ID,
pb.LoadAction_LoadVertex,
count,
false,
0,
sendBuffers[workerIdx].PayLoad())
}
sendBuffers[workerIdx].Reset()
}
}
loader.Close()
}
loadTask.Locker.Lock()
atomic.AddInt32(loadTask.Parallel, -1)
end := atomic.LoadInt32(loadTask.Parallel) == 0
for i := range sendBuffers {
atomic.AddInt32(loadTask.SendCount[peers[i].Name], 1)
}
loadTask.Locker.Unlock()
for i := range sendBuffers {
count := int32(sendBuffers[i].ObjCount())
if graph.UseProperty {
count = int32(sendBuffers[i].ObjCount() / 2)
}
if peers[i].Self {
lb.RecvVertex(
loadTask.Task.ID,
peers[i].Name,
count,
end,
atomic.LoadInt32(loadTask.SendCount[peers[i].Name]),
sendBuffers[i].PayLoad())
} else {
peers[i].LoadActionHandler.LoadAction(
loadTask.Task.ID,
pb.LoadAction_LoadVertex,
count,
end,
atomic.LoadInt32(loadTask.SendCount[peers[i].Name]),
sendBuffers[i].PayLoad())
}
sendBuffers[i].Reset()
}
if end {
for s := range loadTask.SendCount {
*loadTask.SendCount[s] = 0
}
}
}