in vermeer/apps/worker/load_graph_bl.go [41:143]
func (lb *LoadGraphBl) StartLoadGraph(spaceName string, taskId int32, graphName string, workers []string, params map[string]string) {
defer func() {
if r := recover(); r != nil {
lb.SetStatusError(taskId, fmt.Sprintf("StartLoadGraph panic recover panic:%v, stack message: %s", r,
common.GetCurrentGoroutineStack()))
logrus.Errorf("StartLoadGraph panic recover taskID:%v, panic:%v, stack message: %s", taskId, r,
common.GetCurrentGoroutineStack())
}
}()
graph := GraphMgr.CreateGraph(spaceName, graphName)
graph.SetState(structure.GraphStateLoading)
err := GraphMgr.AddGraph(graph)
if err != nil {
logrus.Errorf("add graph error: %s", err)
}
graph.Workers = make([]*structure.GraphWorker, 0, len(workers))
task, err := TaskMgr.CreateTask(spaceName, structure.TaskTypeLoad, taskId)
if err != nil {
logrus.Errorf("create task error: %s", err)
}
for _, wn := range workers {
gw := structure.GraphWorker{
Name: wn,
VertexCount: 0,
}
if wn == ServiceWorker.WorkerName {
gw.IsSelf = true
}
graph.Workers = append(graph.Workers, &gw)
tw := structure.TaskWorker{
Name: wn,
}
task.Workers = append(task.Workers, &tw)
}
task.GraphName = graphName
task.Params = params
TaskMgr.AddTask(task)
logrus.Infof("create load task: %d, graph: %s", taskId, graphName)
loadTask := LoadGraphMgr.InstallTask(task)
// recv vertex wait until init done
loadTask.LoadWg.Add(1)
ctx := context.Background()
parallel := options.GetInt(params, "load.parallel")
if parallel <= 0 {
logrus.Infof("load.parallel value must be larger than 0, get: %v, set to defalut value :1", parallel)
parallel = 1
}
*loadTask.Parallel = int32(parallel)
loadTask.LoadType = options.GetString(params, "load.type")
_, err = graph.SetDataDir(spaceName, graphName, ServiceWorker.WorkerName)
if err != nil {
logrus.Errorf("set data dir error: %s", err)
lb.SetStatusError(taskId, fmt.Sprintf("set data dir error: %s", err))
}
// remove history data
graph.Remove()
backendOption := structure.GraphDataBackendOption{
VertexDataBackend: options.GetString(params, "load.vertex_backend"),
}
graph.MallocData(backendOption)
//Determines if outEdges are required and sets graph.UseOutEdges.
var useOutEdges, useOutDegree, useProperty bool
useOutEdges = options.GetInt(loadTask.Task.Params, "load.use_outedge") == 1
useOutDegree = options.GetInt(loadTask.Task.Params, "load.use_out_degree") == 1
//graph.UseUndirected = options.GetInt(params, "load.use_undirected") == 1
////有无向图功能时,无需out edges
if options.GetInt(params, "load.use_undirected") == 1 {
graph.UseOutEdges = true
}
useProperty = options.GetInt(loadTask.Task.Params, "load.use_property") == 1
if loadTask.LoadType == graphio.LoadTypeHugegraph {
useProperty = true
graph.Data.VertexPropertySchema, graph.Data.InEdgesPropertySchema, err = structure.GetSchemaFromHugegraph(params)
//logrus.Infof(" hugegraph vertex schema %v", graph.Data.VertexPropertySchema)
//logrus.Infof(" hugegraph edge schema %v", graph.Data.InEdgesPropertySchema)
if err != nil {
lb.SetStatusError(taskId, fmt.Sprintf("load schema from hugegraph error:%v", err))
logrus.Errorf("load schema from hugegraph error:%v", err)
return
}
} else if useProperty {
useProperty = true
graph.Data.VertexPropertySchema.Init(options.GetMapString(params, "load.vertex_property"))
graph.Data.InEdgesPropertySchema.Init(options.GetMapString(params, "load.edge_property"))
}
graph.SetOption(useOutEdges, useOutDegree, useProperty)
if graph.UseProperty {
graph.Data.VertexProperty.Init(graph.Data.VertexPropertySchema)
}
loadTask.LoadWg.Done()
for i := 0; i < parallel; i++ {
valueCtx := context.WithValue(ctx, "worker_id", i)
go lb.RunLoadVertex(valueCtx, loadTask)
}
}