vermeer/apps/worker/load_graph_bl.go (1,267 lines of code) (raw):
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with this
work for additional information regarding copyright ownership. The ASF
licenses this file to You under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and limitations
under the License.
*/package worker
import (
"context"
"fmt"
"io"
"sync"
"sync/atomic"
"time"
"vermeer/apps/buffer"
"vermeer/apps/common"
"vermeer/apps/graphio"
"vermeer/apps/options"
pb "vermeer/apps/protos"
"vermeer/apps/serialize"
"vermeer/apps/structure"
"github.com/sirupsen/logrus"
)
const BufferSize = 2 * 1024 * 1024
type LoadGraphBl struct {
}
func (lb *LoadGraphBl) StartLoadGraph(spaceName string, taskId int32, graphName string, workers []string, params map[string]string) {
defer func() {
if r := recover(); r != nil {
lb.SetStatusError(taskId, fmt.Sprintf("StartLoadGraph panic recover panic:%v, stack message: %s", r,
common.GetCurrentGoroutineStack()))
logrus.Errorf("StartLoadGraph panic recover taskID:%v, panic:%v, stack message: %s", taskId, r,
common.GetCurrentGoroutineStack())
}
}()
graph := GraphMgr.CreateGraph(spaceName, graphName)
graph.SetState(structure.GraphStateLoading)
err := GraphMgr.AddGraph(graph)
if err != nil {
logrus.Errorf("add graph error: %s", err)
}
graph.Workers = make([]*structure.GraphWorker, 0, len(workers))
task, err := TaskMgr.CreateTask(spaceName, structure.TaskTypeLoad, taskId)
if err != nil {
logrus.Errorf("create task error: %s", err)
}
for _, wn := range workers {
gw := structure.GraphWorker{
Name: wn,
VertexCount: 0,
}
if wn == ServiceWorker.WorkerName {
gw.IsSelf = true
}
graph.Workers = append(graph.Workers, &gw)
tw := structure.TaskWorker{
Name: wn,
}
task.Workers = append(task.Workers, &tw)
}
task.GraphName = graphName
task.Params = params
TaskMgr.AddTask(task)
logrus.Infof("create load task: %d, graph: %s", taskId, graphName)
loadTask := LoadGraphMgr.InstallTask(task)
// recv vertex wait until init done
loadTask.LoadWg.Add(1)
ctx := context.Background()
parallel := options.GetInt(params, "load.parallel")
if parallel <= 0 {
logrus.Infof("load.parallel value must be larger than 0, get: %v, set to defalut value :1", parallel)
parallel = 1
}
*loadTask.Parallel = int32(parallel)
loadTask.LoadType = options.GetString(params, "load.type")
_, err = graph.SetDataDir(spaceName, graphName, ServiceWorker.WorkerName)
if err != nil {
logrus.Errorf("set data dir error: %s", err)
lb.SetStatusError(taskId, fmt.Sprintf("set data dir error: %s", err))
}
// remove history data
graph.Remove()
backendOption := structure.GraphDataBackendOption{
VertexDataBackend: options.GetString(params, "load.vertex_backend"),
}
graph.MallocData(backendOption)
//Determines if outEdges are required and sets graph.UseOutEdges.
var useOutEdges, useOutDegree, useProperty bool
useOutEdges = options.GetInt(loadTask.Task.Params, "load.use_outedge") == 1
useOutDegree = options.GetInt(loadTask.Task.Params, "load.use_out_degree") == 1
//graph.UseUndirected = options.GetInt(params, "load.use_undirected") == 1
////有无向图功能时,无需out edges
if options.GetInt(params, "load.use_undirected") == 1 {
graph.UseOutEdges = true
}
useProperty = options.GetInt(loadTask.Task.Params, "load.use_property") == 1
if loadTask.LoadType == graphio.LoadTypeHugegraph {
useProperty = true
graph.Data.VertexPropertySchema, graph.Data.InEdgesPropertySchema, err = structure.GetSchemaFromHugegraph(params)
//logrus.Infof(" hugegraph vertex schema %v", graph.Data.VertexPropertySchema)
//logrus.Infof(" hugegraph edge schema %v", graph.Data.InEdgesPropertySchema)
if err != nil {
lb.SetStatusError(taskId, fmt.Sprintf("load schema from hugegraph error:%v", err))
logrus.Errorf("load schema from hugegraph error:%v", err)
return
}
} else if useProperty {
useProperty = true
graph.Data.VertexPropertySchema.Init(options.GetMapString(params, "load.vertex_property"))
graph.Data.InEdgesPropertySchema.Init(options.GetMapString(params, "load.edge_property"))
}
graph.SetOption(useOutEdges, useOutDegree, useProperty)
if graph.UseProperty {
graph.Data.VertexProperty.Init(graph.Data.VertexPropertySchema)
}
loadTask.LoadWg.Done()
for i := 0; i < parallel; i++ {
valueCtx := context.WithValue(ctx, "worker_id", i)
go lb.RunLoadVertex(valueCtx, loadTask)
}
}
func (lb *LoadGraphBl) OnVertexLoaded(taskId int32) {
defer func() {
if r := recover(); r != nil {
lb.SetStatusError(taskId, fmt.Sprintf("OnVertexLoaded panic recover panic:%v, stack message: %s", r,
common.GetCurrentGoroutineStack()))
logrus.Errorf("OnVertexLoaded panic recover taskID:%v, panic:%v, stack message: %s", taskId, r,
common.GetCurrentGoroutineStack())
}
}()
loadTask := LoadGraphMgr.GetLoadTask(taskId)
if !lb.CheckAction(loadTask) {
return
}
//defer lb.CheckAction(loadTask)
ctx := context.Background()
graph := GraphMgr.GetGraphByName(LoadGraphMgr.GetLoadTask(taskId).Task.SpaceName,
LoadGraphMgr.GetLoadTask(taskId).Task.GraphName)
graph.VertexCount = int64(graph.Data.VertexCount)
graph.SetWorkerVertexCount(ServiceWorker.WorkerName, graph.Data.VertexCount, 0)
graph.BuildEdge(options.GetInt(loadTask.Task.Params, "load.edges_per_vertex"))
req := pb.WorkerVertexCountReq{
TaskId: taskId,
WorkerName: ServiceWorker.WorkerName,
Count: atomic.LoadUint32(&graph.Data.VertexCount),
}
common.PrometheusMetrics.VertexCnt.WithLabelValues(graph.Name).Set(float64(graph.Data.VertexCount))
_, err := ServiceWorker.MasterClient.WorkVertexCount(ctx, &req)
if err != nil {
lb.SetStatusError(taskId, fmt.Sprintf("WorkVertexCount error: %s", err))
logrus.Errorf("WorkVertexCount error: %s", err)
return
}
}
func (lb *LoadGraphBl) ScatterVertex(taskID int32) {
defer func() {
if r := recover(); r != nil {
lb.SetStatusError(taskID, fmt.Sprintf("ScatterVertex panic recover panic:%v, stack message: %s", r,
common.GetCurrentGoroutineStack()))
logrus.Errorf("ScatterVertex panic recover taskID:%v, panic:%v, stack message: %s", taskID, r,
common.GetCurrentGoroutineStack())
}
}()
loadTask := LoadGraphMgr.GetLoadTask(taskID)
if !lb.CheckAction(loadTask) {
return
}
//defer lb.CheckAction(loadTask)
graph := GraphMgr.GetGraphByName(loadTask.Task.SpaceName, loadTask.Task.GraphName)
ctx := context.Background()
req := pb.GetGraphWorkersReq{
GraphName: loadTask.Task.GraphName,
SpaceName: loadTask.Task.SpaceName,
}
resp, err := ServiceWorker.MasterClient.GetGraphWorkers(ctx, &req)
if err != nil {
lb.SetStatusError(taskID, fmt.Sprintf("GetGraphWorkers error: %s", err))
logrus.Errorf("GetGraphWorkers error: %s", err)
return
}
for _, w := range resp.Workers {
graph.SetWorkerVertexCount(w.Name, w.VertexCount, w.VertIdStart)
}
workerCount := len(loadTask.Task.Workers)
graph.RecastVertex()
loadTask.LoadWg.Done()
peers := make([]*PeerClient, 0, workerCount-1)
for _, wn := range loadTask.Task.Workers {
if wn.Name == ServiceWorker.WorkerName {
lb.GatherVertex(
loadTask.Task.ID,
wn.Name,
true,
1,
[]byte{})
continue
}
peers = append(peers, PeerMgr.GetPeer(wn.Name))
}
// skip if only on worker
if len(peers) == 0 {
return
}
// sendBuffer := buffer.EncodeBuffer{}
// sendBuffer.Init(BufferSize)
localGw := graph.GetGraphWorker(ServiceWorker.WorkerName)
parallel := options.GetInt(loadTask.Task.Params, "load.parallel")
if parallel <= 0 {
logrus.Infof("load.parallel value must be larger than 0, get: %v, set to defalut value :1", parallel)
parallel = 1
} else if parallel > 10 {
parallel = 10
}
sendBuffers := make([]buffer.EncodeBuffer, parallel)
for i := range sendBuffers {
sendBuffers[i] = buffer.EncodeBuffer{}
sendBuffers[i].Init(BufferSize)
}
partCnt := int(localGw.VertexCount)/parallel + 1
wg := &sync.WaitGroup{}
for i := 0; i < parallel; i++ {
wg.Add(1)
go func(pID int) {
defer func() {
if r := recover(); r != nil {
lb.SetStatusError(taskID, fmt.Sprintf("ScatterVertex panic recover panic:%v, stack message: %s",
r, common.GetCurrentGoroutineStack()))
logrus.Errorf("ScatterVertex panic recover taskID:%v, pId:%v panic:%v, stack message: %s",
taskID, pID, r, common.GetCurrentGoroutineStack())
}
}()
defer wg.Done()
bIdx := uint32(partCnt*pID) + localGw.VertIdStart
eIdx := bIdx + uint32(partCnt)
if eIdx > localGw.VertIdStart+localGw.VertexCount {
eIdx = localGw.VertIdStart + localGw.VertexCount
}
vOffSet := serialize.SUint32(bIdx)
_ = sendBuffers[pID].Marshal(&vOffSet)
for j := bIdx; j < eIdx; j++ {
vertex := graph.Data.Vertex.GetVertex(j)
_ = sendBuffers[pID].Marshal(&vertex)
if graph.UseProperty {
for _, k := range graph.Data.VertexPropertySchema.Schema {
value := graph.Data.VertexProperty.GetValue(k.PropKey, j)
_ = sendBuffers[pID].Marshal(value)
}
}
if sendBuffers[pID].Full() {
count := int32(sendBuffers[pID].ObjCount())
if graph.UseProperty {
count = int32(sendBuffers[pID].ObjCount() / (len(graph.Data.VertexPropertySchema.Schema) + 1))
}
for _, peer := range peers {
atomic.AddInt32(loadTask.SendCount[peer.Name], 1)
peer.LoadActionHandler.LoadAction(
loadTask.Task.ID,
pb.LoadAction_LoadScatter,
count,
false,
0,
sendBuffers[pID].PayLoad())
}
sendBuffers[pID].Reset()
vOffSet = serialize.SUint32(j + 1)
_ = sendBuffers[pID].Marshal(&vOffSet)
}
}
count := int32(sendBuffers[pID].ObjCount())
if graph.UseProperty {
count = int32(sendBuffers[pID].ObjCount() / (len(graph.Data.VertexPropertySchema.Schema) + 1))
}
for _, peer := range peers {
atomic.AddInt32(loadTask.SendCount[peer.Name], 1)
peer.LoadActionHandler.LoadAction(
loadTask.Task.ID,
pb.LoadAction_LoadScatter,
count,
false,
0,
sendBuffers[pID].PayLoad())
}
sendBuffers[pID].Reset()
}(i)
}
wg.Wait()
// vOffSet := serialize.SUint32(localGw.VertIdStart)
// _ = sendBuffer.Marshal(&vOffSet)
// for i := localGw.VertIdStart; i < localGw.VertIdStart+localGw.VertexCount; i++ {
// vertex := graph.Data.Vertex.GetVertex(i)
// _ = sendBuffer.Marshal(&vertex)
// if graph.UseProperty {
// for _, k := range graph.Data.VertexPropertySchema.Schema {
// value := graph.Data.VertexProperty.GetValue(k.PropKey, i)
// _ = sendBuffer.Marshal(value)
// }
// }
// if sendBuffer.Full() {
// count := int32(sendBuffer.ObjCount())
// if graph.UseProperty {
// count = int32(sendBuffer.ObjCount() / (len(graph.Data.VertexPropertySchema.Schema) + 1))
// }
// for _, peer := range peers {
// peer.LoadActionHandler.LoadAction(
// loadTask.Task.ID,
// pb.LoadAction_LoadScatter,
// count,
// false,
// 0,
// sendBuffer.PayLoad())
// }
// sendBuffer.Reset()
// vOffSet = serialize.SUint32(i + 1)
// _ = sendBuffer.Marshal(&vOffSet)
// }
// }
// count := int32(sendBuffer.ObjCount())
// if graph.UseProperty {
// count = int32(sendBuffer.ObjCount() / (len(graph.Data.VertexPropertySchema.Schema) + 1))
// }
for _, peer := range peers {
atomic.AddInt32(loadTask.SendCount[peer.Name], 1)
peer.LoadActionHandler.LoadAction(
loadTask.Task.ID,
pb.LoadAction_LoadScatter,
0,
true,
atomic.LoadInt32(loadTask.SendCount[peer.Name]),
[]byte{})
}
// sendBuffer.Reset()
for s := range loadTask.SendCount {
*loadTask.SendCount[s] = 0
}
}
func (lb *LoadGraphBl) GatherVertex(taskID int32, workerName string, end bool, endNum int32, data []byte) {
defer func() {
if r := recover(); r != nil {
lb.SetStatusError(taskID, fmt.Sprintf("GatherVertex panic recover panic:%v, stack message: %s", r,
common.GetCurrentGoroutineStack()))
logrus.Errorf("GatherVertex panic recover taskID:%v, panic:%v, stack message: %s", taskID, r,
common.GetCurrentGoroutineStack())
}
}()
logrus.Debugf("gather vertex worker:%v, end:%v", workerName, end)
loadTask := LoadGraphMgr.GetLoadTask(taskID)
if !lb.CheckAction(loadTask) {
return
}
//defer lb.CheckAction(loadTask)
graph := GraphMgr.GetGraphByName(loadTask.Task.SpaceName, loadTask.Task.GraphName)
loadTask.LoadWg.Wait()
loadTask.RecvWg.Add(1)
//gw := graph.GetGraphWorker(workerName)
i := 0
vOffSet := serialize.SUint32(0)
if len(data) >= 4 {
n, _ := vOffSet.Unmarshal(data)
i += n
}
vOffSetStart := vOffSet
vertexList := make([]structure.Vertex, 0, 1000)
for i < len(data) {
var vertex structure.Vertex
n, err := vertex.Unmarshal(data[i:])
if err != nil {
lb.SetStatusError(taskID, fmt.Sprintf("load graph read vertex error: %s", err))
logrus.Errorf("load graph read vertex error: %s", err)
break
}
vertexList = append(vertexList, vertex)
// graph.Data.Vertex.SetVertex(uint32(vOffSet), vertex)
//n, err := graph.Data.TotalVertex[int(vOffSet)].Unmarshal(data[i:])
//if err != nil {
// lb.SetStatusError(taskId, fmt.Sprintf("load graph read vertex error: %s", err))
// logrus.Errorf("load graph read vertex error: %s", err)
// break
//}
i += n
if graph.UseProperty {
var value serialize.MarshalAble
for _, k := range graph.Data.VertexPropertySchema.Schema {
switch k.VType {
case structure.ValueTypeInt32:
var sInt32 serialize.SInt32
n, err = sInt32.Unmarshal(data[i:])
value = &sInt32
case structure.ValueTypeFloat32:
var sFloat32 serialize.SFloat32
n, err = sFloat32.Unmarshal(data[i:])
value = &sFloat32
case structure.ValueTypeString:
var sString serialize.SString
n, err = sString.Unmarshal(data[i:])
value = &sString
}
if err != nil {
lb.SetStatusError(taskID, fmt.Sprintf("GatherVertex vertex property error: %s", err))
logrus.Errorf("GatherVertex vertex property error: %s", err)
break
}
graph.Data.VertexProperty.SetValue(k.PropKey, uint32(vOffSet), value)
i += n
}
}
vOffSet += 1
}
graph.Data.Vertex.SetVertices(uint32(vOffSetStart), vertexList...)
//logrus.Infof("GatherVertex offset: %d, worker: %s, end: %v", vOffSet, workerName, end)
loadTask.RecvWg.Done()
atomic.AddInt32(loadTask.RecvCount[workerName], 1)
if end {
// wait for all messages are processed
loadTask.RecvWg.Wait()
for i := 0; i < 100; i++ {
if atomic.LoadInt32(loadTask.RecvCount[workerName]) >= endNum {
break
}
logrus.Warnf("There are still buffer left to be processed. From worker:%v", workerName)
logrus.Debugf("recv count:%v ,end num:%v ", *loadTask.RecvCount[workerName], endNum)
time.Sleep(100 * time.Millisecond)
}
var allWorkerComplete bool
loadTask.Locker.Lock()
loadTask.Task.SetWorkerState(workerName, structure.TaskStateLoadScatterOK)
allWorkerComplete = loadTask.Task.CheckTaskState(structure.TaskStateLoadScatterOK)
loadTask.Locker.Unlock()
if allWorkerComplete {
logrus.Infof("Gather vertex complete, task:%v ", taskID)
//loadTask.Task.SetWorkerState(workerName, structure.TaskStateLoadScatterOK)
//if loadTask.Task.CheckTaskState(structure.TaskStateLoadScatterOK) {
loadTask.Task.SetState(structure.TaskStateLoadScatterOK)
loadTask.LoadWg.Add(1)
graph.BuildTotalVertex()
req := pb.LoadTaskStatusReq{
WorkerName: ServiceWorker.WorkerName,
TaskId: taskID,
State: string(structure.TaskStateLoadScatterOK),
}
ctx := context.Background()
_, err := ServiceWorker.MasterClient.LoadTaskStatus(ctx, &req)
if err != nil {
logrus.Errorf("LoadTaskStatus error: %s", err)
}
for s := range loadTask.RecvCount {
*loadTask.RecvCount[s] = 0
}
}
}
}
func (lb *LoadGraphBl) LoadEdge(taskId int32) {
defer func() {
if r := recover(); r != nil {
lb.SetStatusError(taskId, fmt.Sprintf("LoadEdge panic recover panic:%v, stack message: %s", r,
common.GetCurrentGoroutineStack()))
logrus.Errorf("LoadEdge panic recover taskID:%v, panic:%v, stack message: %s", taskId, r,
common.GetCurrentGoroutineStack())
}
}()
loadTask := LoadGraphMgr.GetLoadTask(taskId)
if !lb.CheckAction(loadTask) {
return
}
//defer lb.CheckAction(loadTask)
ctx := context.Background()
parallel := options.GetInt(loadTask.Task.Params, "load.parallel")
if parallel <= 0 {
logrus.Infof("load.parallel value must be larger than 0, get: %v, set to defalut value :1", parallel)
parallel = 1
}
*loadTask.Parallel = int32(parallel)
loadTask.LoadType = options.GetString(loadTask.Task.Params, "load.type")
loadTask.LoadWg.Done()
logrus.Infof("load edge parallel: %d, type: %s", parallel, loadTask.LoadType)
for i := 0; i < parallel; i++ {
valueCtx := context.WithValue(ctx, "worker_id", i)
go lb.RunLoadEdge(valueCtx, loadTask)
}
}
func (lb *LoadGraphBl) RunLoadVertex(ctx context.Context, loadTask *graphio.LoadGraphTask) {
defer func() {
if r := recover(); r != nil {
lb.SetStatusError(loadTask.Task.ID, fmt.Sprintf("RunLoadVertex panic recover panic:%v, stack message: %s",
r, common.GetCurrentGoroutineStack()))
logrus.Errorf("RunLoadVertex panic recover taskID:%v, panic:%v, stack message: %s",
loadTask.Task.ID, r, common.GetCurrentGoroutineStack())
}
}()
if !lb.CheckAction(loadTask) {
return
}
//defer lb.CheckAction(loadTask)
workerCount := len(loadTask.Task.Workers)
sendBuffers := make([]buffer.EncodeBuffer, 0, workerCount)
peers := make([]*PeerClient, 0, workerCount)
graph := GraphMgr.GetGraphByName(loadTask.Task.SpaceName, loadTask.Task.GraphName)
for _, v := range loadTask.Task.Workers {
peers = append(peers, PeerMgr.GetPeer(v.Name))
buf := buffer.EncodeBuffer{}
buf.Init(BufferSize)
sendBuffers = append(sendBuffers, buf)
}
vertex := structure.Vertex{}
property := structure.PropertyValue{}
property.Init(graph.Data.VertexPropertySchema)
for {
if !lb.CheckAction(loadTask) {
return
}
reqCtx := context.Background()
req := pb.FetchLoadPartReq{}
req.TaskId = loadTask.Task.ID
req.WorkerName = ServiceWorker.WorkerName
resp, err := ServiceWorker.MasterClient.FetchLoadPart(reqCtx, &req)
if err != nil {
logrus.Errorf("RunLoadVertex fetch partition error: %s", err)
break
}
if resp.PartId == 0 {
logrus.Infof("RunLoadVertex fetch part eof, worker: %d", ctx.Value("worker_id"))
break
}
loader := graphio.MakeLoader(loadTask.LoadType)
err = loader.Init(resp.Params, graph.Data.VertexPropertySchema)
if err != nil {
lb.SetStatusError(loadTask.Task.ID, fmt.Sprintf("graph loader init error: %s", err))
logrus.Errorf("graph loader init error: %s", err)
loader.Close()
return
}
logrus.Infof("start read part: %s", loader.Name())
for {
err = loader.ReadVertex(&vertex, &property)
if err != nil {
if err == io.EOF {
logrus.Infof("read part eof: %s, count: %d", loader.Name(), loader.ReadCount())
break
}
lb.SetStatusError(loadTask.Task.ID, fmt.Sprintf("read vertex error: %s", err))
logrus.Errorf("read vertex error: %s", err)
loader.Close()
return
}
workerIdx := common.HashBKDR(vertex.ID) % workerCount
_ = sendBuffers[workerIdx].Marshal(&vertex)
if graph.UseProperty {
_ = sendBuffers[workerIdx].Marshal(&property)
}
if sendBuffers[workerIdx].Full() {
atomic.AddInt32(loadTask.SendCount[peers[workerIdx].Name], 1)
count := int32(sendBuffers[workerIdx].ObjCount())
if graph.UseProperty {
count = int32(sendBuffers[workerIdx].ObjCount() / 2)
}
if peers[workerIdx].Self {
lb.RecvVertex(
loadTask.Task.ID,
peers[workerIdx].Name,
count,
false,
0,
sendBuffers[workerIdx].PayLoad())
} else {
peers[workerIdx].LoadActionHandler.LoadAction(
loadTask.Task.ID,
pb.LoadAction_LoadVertex,
count,
false,
0,
sendBuffers[workerIdx].PayLoad())
}
sendBuffers[workerIdx].Reset()
}
}
loader.Close()
}
loadTask.Locker.Lock()
atomic.AddInt32(loadTask.Parallel, -1)
end := atomic.LoadInt32(loadTask.Parallel) == 0
for i := range sendBuffers {
atomic.AddInt32(loadTask.SendCount[peers[i].Name], 1)
}
loadTask.Locker.Unlock()
for i := range sendBuffers {
count := int32(sendBuffers[i].ObjCount())
if graph.UseProperty {
count = int32(sendBuffers[i].ObjCount() / 2)
}
if peers[i].Self {
lb.RecvVertex(
loadTask.Task.ID,
peers[i].Name,
count,
end,
atomic.LoadInt32(loadTask.SendCount[peers[i].Name]),
sendBuffers[i].PayLoad())
} else {
peers[i].LoadActionHandler.LoadAction(
loadTask.Task.ID,
pb.LoadAction_LoadVertex,
count,
end,
atomic.LoadInt32(loadTask.SendCount[peers[i].Name]),
sendBuffers[i].PayLoad())
}
sendBuffers[i].Reset()
}
if end {
for s := range loadTask.SendCount {
*loadTask.SendCount[s] = 0
}
}
}
func (lb *LoadGraphBl) RunLoadEdge(ctx context.Context, loadTask *graphio.LoadGraphTask) {
defer func() {
if r := recover(); r != nil {
lb.SetStatusError(loadTask.Task.ID, fmt.Sprintf("RunLoadEdge panic recover panic:%v, stack message: %s",
r, common.GetCurrentGoroutineStack()))
logrus.Errorf("RunLoadEdge panic recover taskID:%v, panic:%v, stack message: %s",
loadTask.Task.ID, r, common.GetCurrentGoroutineStack())
}
}()
if !lb.CheckAction(loadTask) {
return
}
//defer lb.CheckAction(loadTask)
workerCount := len(loadTask.Task.Workers)
sendBuffers := make([]buffer.EncodeBuffer, 0, workerCount)
peers := make([]*PeerClient, 0, workerCount)
for _, v := range loadTask.Task.Workers {
peers = append(peers, PeerMgr.GetPeer(v.Name))
buf := buffer.EncodeBuffer{}
buf.Init(BufferSize)
sendBuffers = append(sendBuffers, buf)
}
graph := GraphMgr.GetGraphByName(loadTask.Task.SpaceName, loadTask.Task.GraphName)
edge := structure.Edge{}
intEdge := structure.IntEdge{}
property := structure.PropertyValue{}
property.Init(graph.Data.InEdgesPropertySchema)
for {
if !lb.CheckAction(loadTask) {
return
}
reqCtx := context.Background()
req := pb.FetchLoadPartReq{}
req.TaskId = loadTask.Task.ID
req.WorkerName = ServiceWorker.WorkerName
resp, err := ServiceWorker.MasterClient.FetchLoadPart(reqCtx, &req)
if err != nil {
logrus.Errorf("RunLoadEdge fetch partition error: %s", err)
break
}
if resp.PartId == 0 {
logrus.Infof("RunLoadEdge fetch part eof, worker: %d", ctx.Value("worker_id"))
break
}
loader := graphio.MakeLoader(loadTask.LoadType)
err = loader.Init(resp.Params, graph.Data.InEdgesPropertySchema)
if err != nil {
lb.SetStatusError(loadTask.Task.ID, fmt.Sprintf("graph loader init error: %s", err))
logrus.Errorf("graph loader init error: %s", err)
loader.Close()
return
}
logrus.Infof("start read part: %s", loader.Name())
for {
if !lb.CheckAction(loadTask) {
return
}
err = loader.ReadEdge(&edge, &property)
if err != nil {
if err == io.EOF {
logrus.Infof("read part eof: %s, count: %d", loader.Name(), loader.ReadCount())
break
}
lb.SetStatusError(loadTask.Task.ID, fmt.Sprintf("read edge error: %s", err))
logrus.Errorf("read name:%v edge error: %s", loader.Name(), err)
loader.Close()
return
}
var ok bool
intEdge.Source, ok = graph.Data.Vertex.GetVertexIndex(edge.Source)
if !ok {
continue
}
intEdge.Target, ok = graph.Data.Vertex.GetVertexIndex(edge.Target)
if !ok {
continue
}
workerIdx := common.HashBKDR(edge.Target) % workerCount
_ = sendBuffers[workerIdx].Marshal(&intEdge)
if graph.UseProperty {
_ = sendBuffers[workerIdx].Marshal(&property)
}
if sendBuffers[workerIdx].Full() {
atomic.AddInt32(loadTask.SendCount[peers[workerIdx].Name], 1)
count := int32(sendBuffers[workerIdx].ObjCount())
if graph.UseProperty {
count = int32(sendBuffers[workerIdx].ObjCount() / 2)
}
if peers[workerIdx].Self {
lb.RecvEdge(
loadTask.Task.ID,
peers[workerIdx].Name,
count,
false,
0,
sendBuffers[workerIdx].PayLoad())
} else {
peers[workerIdx].LoadActionHandler.LoadAction(
loadTask.Task.ID,
pb.LoadAction_LoadEdge,
count,
false,
0,
sendBuffers[workerIdx].PayLoad())
}
sendBuffers[workerIdx].Reset()
}
//If the workerId of source and target are the same, sent only once.
//If they are not the same, send an edge for both workerId's.
if graph.UseOutEdges || graph.UseOutDegree {
workerIdxOut := common.HashBKDR(edge.Source) % workerCount
if workerIdxOut == workerIdx {
continue
}
_ = sendBuffers[workerIdxOut].Marshal(&intEdge)
if sendBuffers[workerIdxOut].Full() {
atomic.AddInt32(loadTask.SendCount[peers[workerIdxOut].Name], 1)
if peers[workerIdxOut].Self {
lb.RecvEdge(
loadTask.Task.ID,
peers[workerIdxOut].Name,
int32(sendBuffers[workerIdxOut].ObjCount()),
false,
0,
sendBuffers[workerIdxOut].PayLoad())
} else {
peers[workerIdxOut].LoadActionHandler.LoadAction(
loadTask.Task.ID,
pb.LoadAction_LoadEdge,
int32(sendBuffers[workerIdxOut].ObjCount()),
false,
0,
sendBuffers[workerIdxOut].PayLoad())
}
sendBuffers[workerIdxOut].Reset()
}
}
}
loader.Close()
}
loadTask.Locker.Lock()
atomic.AddInt32(loadTask.Parallel, -1)
end := atomic.LoadInt32(loadTask.Parallel) == 0
for i := range sendBuffers {
atomic.AddInt32(loadTask.SendCount[peers[i].Name], 1)
}
loadTask.Locker.Unlock()
for i := range sendBuffers {
count := int32(sendBuffers[i].ObjCount())
if graph.UseProperty {
count = int32(sendBuffers[i].ObjCount() / 2)
}
if peers[i].Self {
lb.RecvEdge(
loadTask.Task.ID,
peers[i].Name,
count,
end,
atomic.LoadInt32(loadTask.SendCount[peers[i].Name]),
sendBuffers[i].PayLoad())
} else {
peers[i].LoadActionHandler.LoadAction(
loadTask.Task.ID,
pb.LoadAction_LoadEdge,
count,
end,
atomic.LoadInt32(loadTask.SendCount[peers[i].Name]),
sendBuffers[i].PayLoad())
}
sendBuffers[i].Reset()
}
if end {
for s := range loadTask.SendCount {
*loadTask.SendCount[s] = 0
}
}
}
func (lb *LoadGraphBl) LoadPartCompleted(taskId int32, partId int32) {
_ = ServiceWorker.LoadGraphHandler.LoadComplete(taskId, partId)
}
func (lb *LoadGraphBl) RecvVertex(taskId int32, worker string, count int32, end bool, endNum int32, data []byte) {
defer func() {
if r := recover(); r != nil {
lb.SetStatusError(taskId, fmt.Sprintf("RecvVertex panic recover panic:%v, stack message: %s", r,
common.GetCurrentGoroutineStack()))
logrus.Errorf("RecvVertex panic recover taskID:%v, panic:%v, stack message: %s", taskId, r,
common.GetCurrentGoroutineStack())
}
}()
loadTask := LoadGraphMgr.GetLoadTask(taskId)
for i := 0; i < 100 && loadTask == nil; i++ {
//wait 100ms if loadTask not init.
logrus.Warnf("task id:%v is not available, wait 100ms", taskId)
time.Sleep(100 * time.Millisecond)
loadTask = LoadGraphMgr.GetLoadTask(taskId)
}
if !lb.CheckAction(loadTask) {
return
}
loadTask.RecvWg.Add(1)
loadTask.LoadWg.Wait()
//defer lb.CheckAction(loadTask)
graph := GraphMgr.GetGraphByName(loadTask.Task.SpaceName, loadTask.Task.GraphName)
atomic.AddUint32(&graph.Data.VertexCount, uint32(count))
logrus.Debugf("recv vertex count: %d, end: %v,endNum:%v, worker: %s", count, end, endNum, worker)
vertexList := make([]structure.Vertex, count)
properties := structure.VertexProperties{}
properties.Init(graph.Data.VertexPropertySchema)
prop := structure.PropertyValue{}
prop.Init(graph.Data.VertexPropertySchema)
c := 0
for i := 0; i < len(data); {
n, err := vertexList[c].Unmarshal(data[i:])
if err != nil {
lb.SetStatusError(taskId, fmt.Sprintf("load graph read vertex error: %s", err))
logrus.Errorf("load graph read vertex error: %s", err)
break
}
i += n
if graph.UseProperty {
n, err = prop.Unmarshal(data[i:])
if err != nil {
lb.SetStatusError(taskId, fmt.Sprintf("load graph read vertex prop error: %s", err))
logrus.Errorf("load graph read vertex prop error: %s", err)
break
}
properties.AppendProp(prop, graph.Data.VertexPropertySchema)
i += n
}
c += 1
}
if c != int(count) {
lb.SetStatusError(taskId, fmt.Sprintf("RecvVertex count incorrect: %d, %d", c, count))
logrus.Errorf("RecvVertex count incorrect: %d, %d", c, count)
}
graph.Locker.Lock()
graph.Data.Vertex.AppendVertices(vertexList...)
if graph.UseProperty {
graph.Data.VertexProperty.AppendProps(properties)
}
graph.Locker.Unlock()
loadTask.RecvWg.Done()
atomic.AddInt32(loadTask.RecvCount[worker], 1)
if end {
// wait for all messages are processed
loadTask.RecvWg.Wait()
for i := 0; i < 100; i++ {
if atomic.LoadInt32(loadTask.RecvCount[worker]) >= endNum {
break
}
logrus.Warnf("There are still buffer left to be processed. From worker:%v", worker)
logrus.Debugf("recv count:%v ,end num:%v ", *loadTask.RecvCount[worker], endNum)
time.Sleep(100 * time.Millisecond)
}
var allWorkerComplete bool
loadTask.Locker.Lock()
loadTask.Task.SetWorkerState(worker, structure.TaskStateLoadVertexOK)
allWorkerComplete = loadTask.Task.CheckTaskState(structure.TaskStateLoadVertexOK)
loadTask.Locker.Unlock()
if allWorkerComplete {
loadTask.Task.SetState(structure.TaskStateLoadVertexOK)
loadTask.LoadWg.Add(1)
lb.OnVertexLoaded(taskId)
req := pb.LoadTaskStatusReq{
WorkerName: ServiceWorker.WorkerName,
TaskId: taskId,
State: string(structure.TaskStateLoadVertexOK),
}
ctx := context.Background()
_, err := ServiceWorker.MasterClient.LoadTaskStatus(ctx, &req)
if err != nil {
logrus.Errorf("RecvVertex send load task status error: %s", err)
}
for s := range loadTask.RecvCount {
*loadTask.RecvCount[s] = 0
}
}
}
}
func (lb *LoadGraphBl) RecvEdge(taskId int32, worker string, count int32, end bool, endNum int32, data []byte) {
defer func() {
if r := recover(); r != nil {
lb.SetStatusError(taskId, fmt.Sprintf("RecvEdge panic recover panic:%v, stack message: %s", r,
common.GetCurrentGoroutineStack()))
logrus.Errorf("RecvEdge panic recover taskID:%v, panic:%v, stack message: %s", taskId, r,
common.GetCurrentGoroutineStack())
}
}()
loadTask := LoadGraphMgr.GetLoadTask(taskId)
if !lb.CheckAction(loadTask) {
return
}
//defer lb.CheckAction(loadTask)
graph := GraphMgr.GetGraphByName(loadTask.Task.SpaceName, loadTask.Task.GraphName)
loadTask.LoadWg.Wait()
loadTask.RecvWg.Add(1)
logrus.Debugf("recv edge count: %d, end: %v,endNum:%v, worker: %s", count, end, endNum, worker)
e := structure.IntEdge{}
prop := structure.PropertyValue{}
prop.Init(graph.Data.InEdgesPropertySchema)
//graph.Locker.Lock()
if graph.UseOutEdges || graph.UseOutDegree {
//load both inEdges and outEdges
edgeCount := 0
rangeStart := graph.Data.VertIDStart
rangeEnd := graph.Data.VertIDStart + graph.Data.VertexCount
for i := 0; i < len(data); {
n, err := e.Unmarshal(data[i:])
if err != nil {
lb.SetStatusError(taskId, fmt.Sprintf("load graph read edge error: %s", err))
logrus.Errorf("load graph read edge error: %s", err)
break
}
i += n
if rangeStart <= e.Target && e.Target < rangeEnd {
edgeCount += 1
inIdx := e.Target - graph.Data.VertIDStart
graph.Data.Edges.AppendInEdge(inIdx, serialize.SUint32(e.Source))
//graph.Data.EdgeLocker[inIdx].Lock()
//graph.Data.InEdges[inIdx] = append(graph.Data.InEdges[inIdx], serialize.SUint32(e.Source))
//graph.Data.EdgeLocker[inIdx].UnLock()
if graph.UseProperty {
n, err = prop.Unmarshal(data[i:])
if err != nil {
lb.SetStatusError(taskId, fmt.Sprintf("load graph read edge Property error: %s", err))
logrus.Errorf("load graph read edge Property error: %s", err)
break
}
graph.Data.Edges.EdgeLockFunc(inIdx, func() {
graph.Data.InEdgesProperty.AppendProp(prop, inIdx, graph.Data.InEdgesPropertySchema)
})
//graph.Data.EdgeLocker[inIdx].Lock()
//graph.Data.InEdgesProperty.AppendProp(prop, inIdx, graph.Data.InEdgesPropertySchema)
//graph.Data.EdgeLocker[inIdx].UnLock()
i += n
}
}
if rangeStart <= e.Source && e.Source < rangeEnd {
if graph.UseOutDegree {
graph.Data.Edges.AddOutDegree(e.Source, 1)
//atomic.AddUint32((*uint32)(&graph.Data.OutDegree[e.Source]), 1)
}
if graph.UseOutEdges {
outIdx := e.Source - graph.Data.VertIDStart
graph.Data.Edges.AppendOutEdge(outIdx, serialize.SUint32(e.Target))
}
//graph.Data.EdgeLocker[outIdx].Lock()
////if graph.UseUndirected {
//// graph.Data.BothEdges[outIdx] = append(graph.Data.BothEdges[outIdx], serialize.SUint32(e.Target))
//if graph.UseOutEdges {
// graph.Data.OutEdges[outIdx] = append(graph.Data.OutEdges[outIdx], serialize.SUint32(e.Target))
//}
//graph.Data.EdgeLocker[outIdx].UnLock()
}
}
atomic.AddInt64(&graph.EdgeCount, int64(edgeCount))
} else {
//load inEdges only
atomic.AddInt64(&graph.EdgeCount, int64(count))
for i := 0; i < len(data); {
n, err := e.Unmarshal(data[i:])
if err != nil {
lb.SetStatusError(taskId, fmt.Sprintf("load graph read edge error: %s", err))
logrus.Errorf("load graph read edge error: %s", err)
break
}
i += n
eIdx := e.Target - graph.Data.VertIDStart
if eIdx > graph.Data.VertexCount {
logrus.Warnf("edge out of range, source:%v ,target:%v", e.Source, e.Target)
}
graph.Data.Edges.AppendInEdge(eIdx, serialize.SUint32(e.Source))
//graph.Data.EdgeLocker[eIdx].Lock()
//graph.Data.InEdges[eIdx] = append(graph.Data.InEdges[eIdx], serialize.SUint32(e.Source))
//graph.Data.EdgeLocker[eIdx].UnLock()
if graph.UseProperty {
n, err = prop.Unmarshal(data[i:])
if err != nil {
lb.SetStatusError(taskId, fmt.Sprintf("load graph read edge Property error: %s", err))
logrus.Errorf("load graph read edge Property error: %s", err)
break
}
i += n
graph.Data.Edges.EdgeLockFunc(eIdx, func() {
graph.Data.InEdgesProperty.AppendProp(prop, eIdx, graph.Data.InEdgesPropertySchema)
})
//graph.Data.EdgeLocker[eIdx].Lock()
//graph.Data.InEdgesProperty.AppendProp(prop, eIdx, graph.Data.InEdgesPropertySchema)
//graph.Data.EdgeLocker[eIdx].UnLock()
}
}
}
//graph.Locker.Unlock()
loadTask.RecvWg.Done()
atomic.AddInt32(loadTask.RecvCount[worker], 1)
if end {
// wait for all messages are processed
loadTask.RecvWg.Wait()
for i := 0; i < 100; i++ {
if atomic.LoadInt32(loadTask.RecvCount[worker]) >= endNum {
break
}
logrus.Warnf("There are still buffer left to be processed. From worker:%v", worker)
logrus.Debugf("recv count:%v ,end num:%v ", *loadTask.RecvCount[worker], endNum)
time.Sleep(100 * time.Millisecond)
}
tarStatus := structure.TaskStateLoaded
if graph.UseOutDegree {
tarStatus = structure.TaskStateLoadEdgeOK
}
var allWorkerComplete bool
loadTask.Locker.Lock()
loadTask.Task.SetWorkerState(worker, tarStatus)
allWorkerComplete = loadTask.Task.CheckTaskState(tarStatus)
loadTask.Locker.Unlock()
if allWorkerComplete {
graph.OptimizeMemory()
ctx := context.Background()
req2 := pb.WorkerEdgeCountReq{
TaskId: taskId,
WorkerName: ServiceWorker.WorkerName,
Count: graph.EdgeCount,
}
graph.Data.EdgeCount = graph.EdgeCount
_, err := ServiceWorker.MasterClient.WorkEdgeCount(ctx, &req2)
if err != nil {
logrus.Errorf("WorkEdgeCount error: %s", err)
return
}
loadTask.Task.SetState(tarStatus)
req := pb.LoadTaskStatusReq{
WorkerName: ServiceWorker.WorkerName,
TaskId: taskId,
State: string(tarStatus),
}
ctx = context.Background()
_, err = ServiceWorker.MasterClient.LoadTaskStatus(ctx, &req)
if err != nil {
logrus.Errorf("RecvEdge send load task status error: %s", err)
}
for s := range loadTask.RecvCount {
*loadTask.RecvCount[s] = 0
}
}
}
}
func (lb *LoadGraphBl) ScatterOutDegree(taskId int32) {
defer func() {
if r := recover(); r != nil {
lb.SetStatusError(taskId, fmt.Sprintf("ScatterOutDegree panic recover panic:%v, stack message: %s", r,
common.GetCurrentGoroutineStack()))
logrus.Errorf("ScatterOutDegree panic recover taskID:%v, panic:%v, stack message: %s", taskId, r,
common.GetCurrentGoroutineStack())
}
}()
logrus.Infof("ScatterOutDegree start: %d", taskId)
loadTask := LoadGraphMgr.GetLoadTask(taskId)
if !lb.CheckAction(loadTask) {
return
}
//defer lb.CheckAction(loadTask)
graph := GraphMgr.GetGraphByName(loadTask.Task.SpaceName, loadTask.Task.GraphName)
workerCount := len(loadTask.Task.Workers)
peers := make([]*PeerClient, 0, workerCount-1)
for _, wn := range loadTask.Task.Workers {
if wn.Name == ServiceWorker.WorkerName {
lb.GatherOutDegree(taskId, wn.Name, true, []byte{})
continue
}
peers = append(peers, PeerMgr.GetPeer(wn.Name))
}
if len(peers) == 0 {
return
}
sendBuffer := buffer.EncodeBuffer{}
sendBuffer.Init(BufferSize)
vOffSet := serialize.SUint32(graph.Data.VertIDStart)
_ = sendBuffer.Marshal(&vOffSet)
for i := graph.Data.VertIDStart; i < graph.Data.VertIDStart+graph.Data.VertexCount; i++ {
outDegree := graph.Data.Edges.GetOutDegree(i)
_ = sendBuffer.Marshal(&outDegree)
if sendBuffer.Full() {
for _, peer := range peers {
peer.LoadActionHandler.LoadAction(
loadTask.Task.ID,
pb.LoadAction_LoadOutDegree,
int32(sendBuffer.ObjCount()),
false,
0,
sendBuffer.PayLoad())
}
sendBuffer.Reset()
vOffSet = serialize.SUint32(i + 1)
_ = sendBuffer.Marshal(&vOffSet)
}
}
for _, peer := range peers {
peer.LoadActionHandler.LoadAction(
loadTask.Task.ID,
pb.LoadAction_LoadOutDegree,
int32(sendBuffer.ObjCount()),
true,
0,
sendBuffer.PayLoad())
}
sendBuffer.Reset()
}
func (lb *LoadGraphBl) GatherOutDegree(taskId int32, workerName string, end bool, data []byte) {
defer func() {
if r := recover(); r != nil {
lb.SetStatusError(taskId, fmt.Sprintf("GatherOutDegree panic recover panic:%v, stack message: %s", r,
common.GetCurrentGoroutineStack()))
logrus.Errorf("GatherOutDegree panic recover taskID:%v, panic:%v, stack message: %s", taskId, r,
common.GetCurrentGoroutineStack())
}
}()
logrus.Debugf("gather out degree worker:%v, end; %v", workerName, end)
loadTask := LoadGraphMgr.GetLoadTask(taskId)
if !lb.CheckAction(loadTask) {
return
}
//defer lb.CheckAction(loadTask)
graph := GraphMgr.GetGraphByName(loadTask.Task.SpaceName, loadTask.Task.GraphName)
loadTask.RecvWg.Add(1)
i := 0
vOffSet := serialize.SUint32(0)
if len(data) >= 4 {
n, _ := vOffSet.Unmarshal(data)
i += n
}
var outDegree serialize.SUint32
for i < len(data) {
n, err := outDegree.Unmarshal(data[i:])
//n, err := graph.Data.OutDegree[int(vOffSet)].Unmarshal(data[i:])
if err != nil {
lb.SetStatusError(taskId, fmt.Sprintf("load graph gather out degree error: %s", err))
logrus.Errorf("load graph gather out degree error: %s", err)
break
}
graph.Data.Edges.SetOutDegree(uint32(vOffSet), outDegree)
i += n
vOffSet += 1
}
loadTask.RecvWg.Done()
if end {
// wait for all messages are processed
loadTask.RecvWg.Wait()
var allWorkerComplete bool
loadTask.Locker.Lock()
loadTask.Task.SetWorkerState(workerName, structure.TaskStateLoaded)
allWorkerComplete = loadTask.Task.CheckTaskState(structure.TaskStateLoaded)
loadTask.Locker.Unlock()
if allWorkerComplete {
//loadTask.Task.SetWorkerState(workerName, structure.TaskStateLoaded)
//if loadTask.Task.CheckTaskState(structure.TaskStateLoaded) {
loadTask.Task.SetState(structure.TaskStateLoaded)
req := pb.LoadTaskStatusReq{
WorkerName: ServiceWorker.WorkerName,
TaskId: taskId,
State: string(structure.TaskStateLoaded),
}
ctx := context.Background()
_, err := ServiceWorker.MasterClient.LoadTaskStatus(ctx, &req)
if err != nil {
logrus.Errorf("LoadTaskStatus error: %s", err)
}
}
}
}
func (lb *LoadGraphBl) LoadComplete(taskID int32) {
defer func() {
if r := recover(); r != nil {
lb.SetStatusError(taskID, fmt.Sprintf("LoadComplete panic recover panic:%v, stack message: %s", r,
common.GetCurrentGoroutineStack()))
logrus.Errorf("LoadComplete panic recover taskID:%v, panic:%v, stack message: %s", taskID, r,
common.GetCurrentGoroutineStack())
}
}()
loadTask := LoadGraphMgr.GetLoadTask(taskID)
loadTask.Task.State = structure.TaskStateLoaded
graph := GraphMgr.GetGraphByName(loadTask.Task.SpaceName, loadTask.Task.GraphName)
graph.SetState(structure.GraphStateLoaded)
logrus.Infof("graph load complete task: %d, graph: %s", taskID, loadTask.Task.GraphName)
common.PrometheusMetrics.GraphLoadedCnt.WithLabelValues().Inc()
common.PrometheusMetrics.TaskRunningCnt.WithLabelValues(loadTask.Task.Type).Dec()
common.PrometheusMetrics.VertexCnt.WithLabelValues(graph.Name).Set(float64(graph.Data.VertexCount))
common.PrometheusMetrics.EdgeCnt.WithLabelValues(graph.Name).Set(float64(graph.EdgeCount))
lb.endTask(taskID)
}
func (lb *LoadGraphBl) GetStatusError(taskID int32) {
defer func() {
if r := recover(); r != nil {
logrus.Errorf("GetStatusError panic recover taskID:%v, panic:%v, stack message: %s", taskID, r,
common.GetCurrentGoroutineStack())
}
}()
loadTask := LoadGraphMgr.GetLoadTask(taskID)
if loadTask != nil {
loadTask.Task.State = structure.TaskStateError
}
graph := GraphMgr.GetGraphByName(loadTask.Task.SpaceName, loadTask.Task.GraphName)
if graph != nil {
graph.SetState(structure.GraphStateError)
}
time.AfterFunc(1*time.Minute, func() { lb.endTask(taskID) })
common.PrometheusMetrics.TaskRunningCnt.WithLabelValues(loadTask.Task.Type).Dec()
}
func (lb *LoadGraphBl) SetStatusError(taskId int32, message string) {
defer func() {
if r := recover(); r != nil {
logrus.Errorf("SetStatusError panic recover taskID:%v, panic:%v, stack message: %s", taskId, r,
common.GetCurrentGoroutineStack())
}
}()
loadTask := LoadGraphMgr.GetLoadTask(taskId)
loadTask.Task.State = structure.TaskStateError
graph := GraphMgr.GetGraphByName(loadTask.Task.SpaceName, loadTask.Task.GraphName)
graph.SetState(structure.GraphStateError)
logrus.Errorf("graph load task error: taskId:%d, graph: %s", taskId, loadTask.Task.GraphName)
req := pb.LoadTaskStatusReq{
WorkerName: ServiceWorker.WorkerName,
TaskId: taskId,
State: string(structure.TaskStateError),
ErrorMsg: message,
}
_, err := ServiceWorker.MasterClient.LoadTaskStatus(context.Background(), &req)
if err != nil {
logrus.Errorf("LoadTaskStatus error: %s", err)
}
}
func (lb *LoadGraphBl) CheckAction(loadTask *graphio.LoadGraphTask) (isContinue bool) {
if loadTask == nil {
return true
}
switch atomic.LoadInt32(&loadTask.Task.Action) {
case structure.ActionDoNoting:
case structure.ActionCancelTask:
lb.cancelAction(loadTask)
return false
case structure.ActionPauseTask:
return lb.pauseAction(loadTask)
default:
logrus.Errorf("unknown action %d", loadTask.Task.Action)
}
//if computeTask.Task.State == structure.TaskStateCanceled {
// return true
//} else if computeTask.Task.State == structure.TaskStateCanceling {
// logrus.Infof("task is canceled, task_id:%v", computeTask.Task.ID)
// common.PrometheusMetrics.TaskRunningCnt.WithLabelValues(computeTask.Task.Type).Dec()
// computeTask.Task.SetState(structure.TaskStateCanceled)
// time.AfterFunc(1*time.Minute, func() { cb.endTask(computeTask.Task.ID) })
// return true
//}
return true
}
func (lb *LoadGraphBl) cancelAction(loadTask *graphio.LoadGraphTask) {
loadTask.Task.SetState(structure.TaskStateCanceled)
common.PrometheusMetrics.TaskRunningCnt.WithLabelValues(loadTask.Task.Type).Dec()
time.AfterFunc(1*time.Minute, func() { lb.endTask(loadTask.Task.ID) })
}
func (lb *LoadGraphBl) pauseAction(loadTask *graphio.LoadGraphTask) bool {
task := loadTask.Task
for {
switch atomic.LoadInt32(&task.Action) {
case structure.ActionCancelTask:
lb.cancelAction(loadTask)
return false
case structure.ActionResumeTask:
return true
default:
time.Sleep(10 * time.Second)
}
}
}
func (lb *LoadGraphBl) endTask(taskID int32) {
loadGraphTask := LoadGraphMgr.GetLoadTask(taskID)
if loadGraphTask != nil {
loadGraphTask.FreeMemory()
LoadGraphMgr.DeleteTask(loadGraphTask.Task.ID)
}
}
type EdgesBl struct {
}
func (eb *EdgesBl) GetEdges(spaceName string, graphName string, vertId string, direction string) ([]string, []string, []*pb.EdgeProperty, error) {
graph := GraphMgr.GetGraphByName(spaceName, graphName)
if graph == nil {
return []string{}, []string{}, []*pb.EdgeProperty{}, fmt.Errorf("graph %s not found", graphName)
}
shortId, ok := graph.Data.Vertex.GetVertexIndex(vertId)
if !ok {
return []string{}, []string{}, []*pb.EdgeProperty{}, fmt.Errorf("vertexId %s not found", vertId)
}
if shortId < graph.Data.VertIDStart ||
shortId >= graph.Data.VertIDStart+graph.Data.VertexCount {
return []string{}, []string{}, []*pb.EdgeProperty{}, fmt.Errorf("vertexId %s not in range", vertId)
}
getInEdges := func() ([]string, error) {
//if graph.Data.InEdges == nil {
// return []string{}, fmt.Errorf("inEdges is nil")
//}
inEdges := graph.Data.Edges.GetInEdges(shortId - graph.Data.VertIDStart)
edges := make([]string, 0, len(inEdges))
for _, s := range inEdges {
edges = append(edges, graph.Data.Vertex.GetVertex(uint32(s)).ID)
}
return edges, nil
}
getOutEdges := func() ([]string, error) {
if !graph.UseOutEdges {
return []string{}, fmt.Errorf("outEdges is nil")
}
outEdges := graph.Data.Edges.GetOutEdges(shortId - graph.Data.VertIDStart)
edges := make([]string, 0, len(outEdges))
for _, s := range outEdges {
edges = append(edges, graph.Data.Vertex.GetVertex(uint32(s)).ID)
}
return edges, nil
}
var edgeProperties []*pb.EdgeProperty
if graph != nil && graph.UseProperty {
inEdges, err := getInEdges()
if err != nil {
return []string{}, []string{}, []*pb.EdgeProperty{}, err
}
edgeProperties = make([]*pb.EdgeProperty, 0, len(inEdges))
for i, edge := range inEdges {
properties := &pb.EdgeProperty{}
properties.Edge = edge
properties.Property = make(map[string]string)
for _, ps := range graph.Data.InEdgesPropertySchema.Schema {
prop, err := graph.Data.InEdgesProperty.GetValue(ps.PropKey, shortId-graph.Data.VertIDStart, uint32(i))
if err != nil {
return []string{}, []string{}, []*pb.EdgeProperty{}, err
}
properties.Property[ps.PropKey] = prop.ToString()
}
edgeProperties = append(edgeProperties, properties)
}
}
if direction == "in" {
inEdges, err := getInEdges()
return inEdges, []string{}, edgeProperties, err
} else if direction == "out" {
outEdges, err := getOutEdges()
return []string{}, outEdges, []*pb.EdgeProperty{}, err
} else if direction == "both" {
inEdges, err := getInEdges()
outEdges, err := getOutEdges()
return inEdges, outEdges, []*pb.EdgeProperty{}, err
}
return []string{}, []string{}, []*pb.EdgeProperty{}, fmt.Errorf("direction %s not supported", direction)
}
type DegreeBl struct {
}
func (db *DegreeBl) GetDegree(spaceName string, graphName string, vertId string, direction string) uint32 {
graph := GraphMgr.GetGraphByName(spaceName, graphName)
if graph == nil {
return 0
}
shortId, ok := graph.Data.Vertex.GetVertexIndex(vertId)
if !ok {
return 0
}
//if shortId < graph.Data.VertIDStart ||
// shortId > graph.Data.VertIDStart+graph.Data.VertexCount {
// return 0
//}
degree := uint32(0)
if direction == "in" || direction == "both" {
degree += uint32(len(graph.Data.Edges.GetInEdges(shortId)))
}
if graph.UseOutDegree && (direction == "out" || direction == "both") {
degree += uint32(graph.Data.Edges.GetOutDegree(shortId))
}
return degree
}