func()

in vermeer/apps/worker/load_graph_bl.go [41:143]


func (lb *LoadGraphBl) StartLoadGraph(spaceName string, taskId int32, graphName string, workers []string, params map[string]string) {
	defer func() {
		if r := recover(); r != nil {
			lb.SetStatusError(taskId, fmt.Sprintf("StartLoadGraph panic recover panic:%v,  stack message: %s", r,
				common.GetCurrentGoroutineStack()))
			logrus.Errorf("StartLoadGraph panic recover taskID:%v, panic:%v,  stack message: %s", taskId, r,
				common.GetCurrentGoroutineStack())
		}
	}()

	graph := GraphMgr.CreateGraph(spaceName, graphName)
	graph.SetState(structure.GraphStateLoading)
	err := GraphMgr.AddGraph(graph)
	if err != nil {
		logrus.Errorf("add graph error: %s", err)
	}
	graph.Workers = make([]*structure.GraphWorker, 0, len(workers))
	task, err := TaskMgr.CreateTask(spaceName, structure.TaskTypeLoad, taskId)
	if err != nil {
		logrus.Errorf("create task error: %s", err)
	}
	for _, wn := range workers {
		gw := structure.GraphWorker{
			Name:        wn,
			VertexCount: 0,
		}
		if wn == ServiceWorker.WorkerName {
			gw.IsSelf = true
		}
		graph.Workers = append(graph.Workers, &gw)

		tw := structure.TaskWorker{
			Name: wn,
		}
		task.Workers = append(task.Workers, &tw)
	}

	task.GraphName = graphName
	task.Params = params
	TaskMgr.AddTask(task)
	logrus.Infof("create load task: %d, graph: %s", taskId, graphName)
	loadTask := LoadGraphMgr.InstallTask(task)
	// recv vertex wait until init done
	loadTask.LoadWg.Add(1)
	ctx := context.Background()
	parallel := options.GetInt(params, "load.parallel")
	if parallel <= 0 {
		logrus.Infof("load.parallel value must be larger than 0, get: %v, set to defalut value :1", parallel)
		parallel = 1
	}
	*loadTask.Parallel = int32(parallel)
	loadTask.LoadType = options.GetString(params, "load.type")

	_, err = graph.SetDataDir(spaceName, graphName, ServiceWorker.WorkerName)
	if err != nil {
		logrus.Errorf("set data dir error: %s", err)
		lb.SetStatusError(taskId, fmt.Sprintf("set data dir error: %s", err))
	}
	// remove history data
	graph.Remove()

	backendOption := structure.GraphDataBackendOption{
		VertexDataBackend: options.GetString(params, "load.vertex_backend"),
	}
	graph.MallocData(backendOption)

	//Determines if outEdges are required and sets graph.UseOutEdges.
	var useOutEdges, useOutDegree, useProperty bool
	useOutEdges = options.GetInt(loadTask.Task.Params, "load.use_outedge") == 1
	useOutDegree = options.GetInt(loadTask.Task.Params, "load.use_out_degree") == 1
	//graph.UseUndirected = options.GetInt(params, "load.use_undirected") == 1
	////有无向图功能时,无需out edges
	if options.GetInt(params, "load.use_undirected") == 1 {
		graph.UseOutEdges = true
	}
	useProperty = options.GetInt(loadTask.Task.Params, "load.use_property") == 1
	if loadTask.LoadType == graphio.LoadTypeHugegraph {
		useProperty = true
		graph.Data.VertexPropertySchema, graph.Data.InEdgesPropertySchema, err = structure.GetSchemaFromHugegraph(params)
		//logrus.Infof(" hugegraph vertex schema %v", graph.Data.VertexPropertySchema)
		//logrus.Infof(" hugegraph edge schema %v", graph.Data.InEdgesPropertySchema)
		if err != nil {
			lb.SetStatusError(taskId, fmt.Sprintf("load schema from hugegraph error:%v", err))
			logrus.Errorf("load schema from hugegraph error:%v", err)
			return
		}

	} else if useProperty {
		useProperty = true
		graph.Data.VertexPropertySchema.Init(options.GetMapString(params, "load.vertex_property"))
		graph.Data.InEdgesPropertySchema.Init(options.GetMapString(params, "load.edge_property"))
	}
	graph.SetOption(useOutEdges, useOutDegree, useProperty)
	if graph.UseProperty {
		graph.Data.VertexProperty.Init(graph.Data.VertexPropertySchema)
	}

	loadTask.LoadWg.Done()
	for i := 0; i < parallel; i++ {
		valueCtx := context.WithValue(ctx, "worker_id", i)
		go lb.RunLoadVertex(valueCtx, loadTask)
	}
}