in vermeer/apps/worker/load_graph_bl.go [371:489]
func (lb *LoadGraphBl) GatherVertex(taskID int32, workerName string, end bool, endNum int32, data []byte) {
defer func() {
if r := recover(); r != nil {
lb.SetStatusError(taskID, fmt.Sprintf("GatherVertex panic recover panic:%v, stack message: %s", r,
common.GetCurrentGoroutineStack()))
logrus.Errorf("GatherVertex panic recover taskID:%v, panic:%v, stack message: %s", taskID, r,
common.GetCurrentGoroutineStack())
}
}()
logrus.Debugf("gather vertex worker:%v, end:%v", workerName, end)
loadTask := LoadGraphMgr.GetLoadTask(taskID)
if !lb.CheckAction(loadTask) {
return
}
//defer lb.CheckAction(loadTask)
graph := GraphMgr.GetGraphByName(loadTask.Task.SpaceName, loadTask.Task.GraphName)
loadTask.LoadWg.Wait()
loadTask.RecvWg.Add(1)
//gw := graph.GetGraphWorker(workerName)
i := 0
vOffSet := serialize.SUint32(0)
if len(data) >= 4 {
n, _ := vOffSet.Unmarshal(data)
i += n
}
vOffSetStart := vOffSet
vertexList := make([]structure.Vertex, 0, 1000)
for i < len(data) {
var vertex structure.Vertex
n, err := vertex.Unmarshal(data[i:])
if err != nil {
lb.SetStatusError(taskID, fmt.Sprintf("load graph read vertex error: %s", err))
logrus.Errorf("load graph read vertex error: %s", err)
break
}
vertexList = append(vertexList, vertex)
// graph.Data.Vertex.SetVertex(uint32(vOffSet), vertex)
//n, err := graph.Data.TotalVertex[int(vOffSet)].Unmarshal(data[i:])
//if err != nil {
// lb.SetStatusError(taskId, fmt.Sprintf("load graph read vertex error: %s", err))
// logrus.Errorf("load graph read vertex error: %s", err)
// break
//}
i += n
if graph.UseProperty {
var value serialize.MarshalAble
for _, k := range graph.Data.VertexPropertySchema.Schema {
switch k.VType {
case structure.ValueTypeInt32:
var sInt32 serialize.SInt32
n, err = sInt32.Unmarshal(data[i:])
value = &sInt32
case structure.ValueTypeFloat32:
var sFloat32 serialize.SFloat32
n, err = sFloat32.Unmarshal(data[i:])
value = &sFloat32
case structure.ValueTypeString:
var sString serialize.SString
n, err = sString.Unmarshal(data[i:])
value = &sString
}
if err != nil {
lb.SetStatusError(taskID, fmt.Sprintf("GatherVertex vertex property error: %s", err))
logrus.Errorf("GatherVertex vertex property error: %s", err)
break
}
graph.Data.VertexProperty.SetValue(k.PropKey, uint32(vOffSet), value)
i += n
}
}
vOffSet += 1
}
graph.Data.Vertex.SetVertices(uint32(vOffSetStart), vertexList...)
//logrus.Infof("GatherVertex offset: %d, worker: %s, end: %v", vOffSet, workerName, end)
loadTask.RecvWg.Done()
atomic.AddInt32(loadTask.RecvCount[workerName], 1)
if end {
// wait for all messages are processed
loadTask.RecvWg.Wait()
for i := 0; i < 100; i++ {
if atomic.LoadInt32(loadTask.RecvCount[workerName]) >= endNum {
break
}
logrus.Warnf("There are still buffer left to be processed. From worker:%v", workerName)
logrus.Debugf("recv count:%v ,end num:%v ", *loadTask.RecvCount[workerName], endNum)
time.Sleep(100 * time.Millisecond)
}
var allWorkerComplete bool
loadTask.Locker.Lock()
loadTask.Task.SetWorkerState(workerName, structure.TaskStateLoadScatterOK)
allWorkerComplete = loadTask.Task.CheckTaskState(structure.TaskStateLoadScatterOK)
loadTask.Locker.Unlock()
if allWorkerComplete {
logrus.Infof("Gather vertex complete, task:%v ", taskID)
//loadTask.Task.SetWorkerState(workerName, structure.TaskStateLoadScatterOK)
//if loadTask.Task.CheckTaskState(structure.TaskStateLoadScatterOK) {
loadTask.Task.SetState(structure.TaskStateLoadScatterOK)
loadTask.LoadWg.Add(1)
graph.BuildTotalVertex()
req := pb.LoadTaskStatusReq{
WorkerName: ServiceWorker.WorkerName,
TaskId: taskID,
State: string(structure.TaskStateLoadScatterOK),
}
ctx := context.Background()
_, err := ServiceWorker.MasterClient.LoadTaskStatus(ctx, &req)
if err != nil {
logrus.Errorf("LoadTaskStatus error: %s", err)
}
for s := range loadTask.RecvCount {
*loadTask.RecvCount[s] = 0
}
}
}
}