in vermeer/apps/worker/load_graph_bl.go [849:952]
func (lb *LoadGraphBl) RecvVertex(taskId int32, worker string, count int32, end bool, endNum int32, data []byte) {
defer func() {
if r := recover(); r != nil {
lb.SetStatusError(taskId, fmt.Sprintf("RecvVertex panic recover panic:%v, stack message: %s", r,
common.GetCurrentGoroutineStack()))
logrus.Errorf("RecvVertex panic recover taskID:%v, panic:%v, stack message: %s", taskId, r,
common.GetCurrentGoroutineStack())
}
}()
loadTask := LoadGraphMgr.GetLoadTask(taskId)
for i := 0; i < 100 && loadTask == nil; i++ {
//wait 100ms if loadTask not init.
logrus.Warnf("task id:%v is not available, wait 100ms", taskId)
time.Sleep(100 * time.Millisecond)
loadTask = LoadGraphMgr.GetLoadTask(taskId)
}
if !lb.CheckAction(loadTask) {
return
}
loadTask.RecvWg.Add(1)
loadTask.LoadWg.Wait()
//defer lb.CheckAction(loadTask)
graph := GraphMgr.GetGraphByName(loadTask.Task.SpaceName, loadTask.Task.GraphName)
atomic.AddUint32(&graph.Data.VertexCount, uint32(count))
logrus.Debugf("recv vertex count: %d, end: %v,endNum:%v, worker: %s", count, end, endNum, worker)
vertexList := make([]structure.Vertex, count)
properties := structure.VertexProperties{}
properties.Init(graph.Data.VertexPropertySchema)
prop := structure.PropertyValue{}
prop.Init(graph.Data.VertexPropertySchema)
c := 0
for i := 0; i < len(data); {
n, err := vertexList[c].Unmarshal(data[i:])
if err != nil {
lb.SetStatusError(taskId, fmt.Sprintf("load graph read vertex error: %s", err))
logrus.Errorf("load graph read vertex error: %s", err)
break
}
i += n
if graph.UseProperty {
n, err = prop.Unmarshal(data[i:])
if err != nil {
lb.SetStatusError(taskId, fmt.Sprintf("load graph read vertex prop error: %s", err))
logrus.Errorf("load graph read vertex prop error: %s", err)
break
}
properties.AppendProp(prop, graph.Data.VertexPropertySchema)
i += n
}
c += 1
}
if c != int(count) {
lb.SetStatusError(taskId, fmt.Sprintf("RecvVertex count incorrect: %d, %d", c, count))
logrus.Errorf("RecvVertex count incorrect: %d, %d", c, count)
}
graph.Locker.Lock()
graph.Data.Vertex.AppendVertices(vertexList...)
if graph.UseProperty {
graph.Data.VertexProperty.AppendProps(properties)
}
graph.Locker.Unlock()
loadTask.RecvWg.Done()
atomic.AddInt32(loadTask.RecvCount[worker], 1)
if end {
// wait for all messages are processed
loadTask.RecvWg.Wait()
for i := 0; i < 100; i++ {
if atomic.LoadInt32(loadTask.RecvCount[worker]) >= endNum {
break
}
logrus.Warnf("There are still buffer left to be processed. From worker:%v", worker)
logrus.Debugf("recv count:%v ,end num:%v ", *loadTask.RecvCount[worker], endNum)
time.Sleep(100 * time.Millisecond)
}
var allWorkerComplete bool
loadTask.Locker.Lock()
loadTask.Task.SetWorkerState(worker, structure.TaskStateLoadVertexOK)
allWorkerComplete = loadTask.Task.CheckTaskState(structure.TaskStateLoadVertexOK)
loadTask.Locker.Unlock()
if allWorkerComplete {
loadTask.Task.SetState(structure.TaskStateLoadVertexOK)
loadTask.LoadWg.Add(1)
lb.OnVertexLoaded(taskId)
req := pb.LoadTaskStatusReq{
WorkerName: ServiceWorker.WorkerName,
TaskId: taskId,
State: string(structure.TaskStateLoadVertexOK),
}
ctx := context.Background()
_, err := ServiceWorker.MasterClient.LoadTaskStatus(ctx, &req)
if err != nil {
logrus.Errorf("RecvVertex send load task status error: %s", err)
}
for s := range loadTask.RecvCount {
*loadTask.RecvCount[s] = 0
}
}
}
}