vermeer/apps/master/graphs/graph_bl.go (321 lines of code) (raw):
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with this
work for additional information regarding copyright ownership. The ASF
licenses this file to You under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and limitations
under the License.
*/
package graphs
import (
"context"
"fmt"
"sort"
"vermeer/apps/common"
"vermeer/apps/options"
pb "vermeer/apps/protos"
"vermeer/apps/structure"
"github.com/sirupsen/logrus"
)
type GraphBl struct {
Cred *structure.Credential
}
// GetGraph
func (gb *GraphBl) GetGraph(graphName string) (graph *structure.VermeerGraph, err error) {
g := graphMgr.GetGraphByName(gb.Cred.Space(), graphName)
if g == nil {
return nil, fmt.Errorf("graph %v/%v not exist", gb.Cred.Space(), graphName)
}
g.Status = g.State.Converter()
return g, nil
}
var graphStateCanDelete = map[structure.GraphState]struct{}{
structure.GraphStateCreated: {},
structure.GraphStateError: {},
structure.GraphStateLoaded: {},
structure.GraphStateOnDisk: {},
structure.GraphStateInComplete: {},
}
// DeleteGraph Sending a delete request to the workers, while removing the graph data file.
func (gb *GraphBl) DeleteGraph(graphName string) error {
graph, err := gb.GetGraph(graphName)
if err != nil {
return err
}
if _, ok := graphStateCanDelete[graph.State]; !ok {
return fmt.Errorf("graph cannot delete, status: %s", graph.State)
}
//if atomic.LoadInt32(&graph.UsingNum) != 0
if !graph.AlwaysUsing() && graph.GetUsingNum() != 0 {
return fmt.Errorf("graph is busy, using_num: %v, graph: %s/%s", graph.GetUsingNum(), graph.SpaceName, graph.Name)
}
for _, worker := range graph.Workers {
workerClient := workerMgr.GetWorker(worker.Name)
req := pb.DeleteGraphReq{
SpaceName: gb.Cred.Space(),
GraphName: graphName,
DeleteFile: true,
}
if workerClient == nil {
logrus.Errorf("graph '%v/%v worker' '%v' not exist", gb.Cred.Space(), graphName, worker.Name)
continue
}
_, err := workerClient.Session.DeleteGraph(context.Background(), &req)
if err != nil {
logrus.Warnf("worker delete graph err: %s, %s", worker.Name, graphName)
}
}
graphMgr.DeleteGraph(gb.Cred.Space(), graphName)
graphMgr.DeleteInfo(gb.Cred.Space(), graphName)
return nil
}
// ReleaseGraph Sending a delete request to the workers, while preserving the graph data file.
func (gb *GraphBl) ReleaseGraph(graphName string) error {
graph, err := gb.GetGraph(graphName)
if err != nil {
return err
}
for _, graphWorker := range graph.Workers {
workerClient, err := workerMgr.GetWorkerByName(graphWorker.Name)
if err != nil {
logrus.Warnf("worker release graph err: worker with name '%s' not exists, graph: %s/%s", graphWorker.Name, graph.SpaceName, graph.Name)
continue
}
if !workerMgr.CheckWorkerAlive(workerClient.Name) {
logrus.Infof("aborted to release graph through a worker with the name: '%s', which is not alive, graph: %s/%s",
graphWorker.Name, graph.SpaceName, graph.Name)
continue
}
req := pb.DeleteGraphReq{
SpaceName: graph.SpaceName,
GraphName: graph.Name,
DeleteFile: false,
}
_, err = workerClient.Session.DeleteGraph(context.Background(), &req)
if err != nil {
logrus.Warnf("worker release graph err, worker: %s, graph: %s/%s, error: %v", graphWorker.Name, graph.SpaceName, graph.Name, err)
}
}
if graph.OnDisk {
graph.SetState(structure.GraphStateInComplete)
//graphMgr.ForceState(graph, structure.GraphStateInComplete)
//graph.UsingNum = 0
graph.ResetUsingNum()
} else {
graphMgr.DeleteGraph(graph.SpaceName, graph.Name)
graphMgr.DeleteInfo(graph.SpaceName, graph.Name)
}
return nil
}
// GetGraphs
func (gb *GraphBl) GetGraphs() ([]*structure.VermeerGraph, error) {
var graphs []*structure.VermeerGraph
if gb.Cred.IsAdmin() {
graphs = graphMgr.GetAllGraphs()
} else {
graphs = graphMgr.GetGraphs(gb.Cred.Space())
}
for _, graph := range graphs {
graph.Status = graph.State.Converter()
}
return graphs, nil
}
// GetSpaceGraphs returns all graphs in the space (admin only)
func (gb *GraphBl) GetSpaceGraphs(space string) ([]*structure.VermeerGraph, error) {
var graphs []*structure.VermeerGraph
if !gb.Cred.IsAdmin() {
return nil, fmt.Errorf("admin access only")
}
graphs = graphMgr.GetGraphs(space)
for _, graph := range graphs {
graph.Status = graph.State.Converter()
}
return graphs, nil
}
// AppendGraph
// Create a new graph and add it to the graph manager if it does not exist, indicated by name.
// Always return a graph pointer when no error is raised.
// If the name exists, the pointer will point to the old one.
func (gb *GraphBl) AppendGraph(graphName string, params map[string]string) (graph *structure.VermeerGraph, exists bool, err error) {
if graphName == "" {
return nil, false, fmt.Errorf("invalid graph name")
}
defer graphMgr.UnSync(graphMgr.Sync())
graph = graphMgr.GetGraphByName(gb.Cred.Space(), graphName)
if graph != nil {
exists = true
goto SAVE
}
graph, err = graphMgr.AddSpaceGraph(gb.Cred.Space(), graphName)
if err != nil {
exists = false
return
}
goto SAVE
SAVE:
gb.setGraphProperties(graph, params)
if err = graphMgr.SaveInfo(gb.Cred.Space(), graph.Name); err != nil {
graphMgr.DeleteGraph(gb.Cred.Space(), graphName)
return nil, exists, err
}
return
}
func (gb *GraphBl) AppendGraphObj(graph *structure.VermeerGraph, params map[string]string) (exists bool, err error) {
if graph == nil {
return false, fmt.Errorf("graphObj is nil")
}
if graph.SpaceName != gb.Cred.Space() {
return false, fmt.Errorf("the space name of the graph does not match with the credentials: %s", gb.Cred.Space())
}
defer graphMgr.UnSync(graphMgr.Sync())
if g := graphMgr.GetGraphByName(gb.Cred.Space(), graph.Name); g != nil {
exists = true
err = nil
goto SAVE
}
err = graphMgr.AddGraph(graph)
if err != nil {
return false, err
}
exists = false
err = nil
goto SAVE
SAVE:
gb.setGraphProperties(graph, params)
if err = graphMgr.SaveInfo(gb.Cred.Space(), graph.Name); err != nil {
graphMgr.DeleteGraph(gb.Cred.Space(), graph.Name)
return false, err
}
return
}
// CreateGraph
// deprecated to see AppendGraph
func (gb *GraphBl) CreateGraph(graphName string, params map[string]string) (*structure.VermeerGraph, error) {
if graphName == "" {
return nil, fmt.Errorf("invalid graph name")
}
graph, err := graphMgr.AddSpaceGraph(gb.Cred.Space(), graphName)
if err != nil {
return nil, err
}
gb.setGraphProperties(graph, params)
if err = graphMgr.SaveInfo(gb.Cred.Space(), graph.Name); err != nil {
graphMgr.DeleteGraph(gb.Cred.Space(), graphName)
return nil, err
}
return graph, nil
}
func (gb *GraphBl) setGraphProperties(graph *structure.VermeerGraph, params map[string]string) {
if graph == nil {
return
}
graph.UseOutEdges = options.GetInt(params, "load.use_outedge") == 1
graph.UseOutDegree = options.GetInt(params, "load.use_out_degree") == 1
graph.UseProperty = options.GetInt(params, "load.use_property") == 1
//有无向图功能时,无需out edges
if options.GetInt(params, "load.use_undirected") == 1 {
graph.UseOutEdges = true
}
graph.BackendOption.VertexDataBackend = options.GetString(params, "load.vertex_backend")
// set always using
graph.SetAlwaysUsing(options.GetInt(params, "load.always_using") == 1)
graph.Params = params
}
// GetEdges
func (gb *GraphBl) GetEdges(graphName, vertexId, direction string) (inEdges, outEdges []string, inEdgeProperty []EdgeProperty, err error) {
graph, err := gb.GetGraph(graphName)
if err != nil {
return nil, nil, nil, err
}
if graph.State != structure.GraphStateLoaded {
return nil, nil, nil, fmt.Errorf("graph get edge, status: %s", graph.State)
}
if vertexId == "" {
return nil, nil, nil, fmt.Errorf("vertex_id not exist: %s", vertexId)
}
workerIdx := common.HashBKDR(vertexId) % len(graph.Workers)
workerClient := workerMgr.GetWorker(graph.Workers[workerIdx].Name)
getEdgesResp, err := workerClient.Session.GetEdges(context.Background(),
&pb.GetEdgesReq{SpaceName: gb.Cred.Space(), GraphName: graphName, VertexId: vertexId, Direction: direction})
if err != nil {
return nil, nil, nil, fmt.Errorf("graph get edge, error: %w", err)
}
inEdges = getEdgesResp.InEdges
outEdges = getEdgesResp.OutEdges
//resp.BothEdges = getEdgesResp.BothEdges
inEdgeProperty = make([]EdgeProperty, 0, len(getEdgesResp.InEdgeProperty))
for _, edgeProp := range getEdgesResp.InEdgeProperty {
inEdgeProperty = append(inEdgeProperty, EdgeProperty{edgeProp.Edge, edgeProp.Property})
}
return inEdges, outEdges, inEdgeProperty, nil
}
// GetVertices
func (gb *GraphBl) GetVertices(graphName string, vertexIds []string) (vertices []Vertex, err error) {
graph, err := gb.GetGraph(graphName)
if err != nil {
return nil, err
}
//校验图状态
if graph.State != structure.GraphStateLoaded {
return nil, fmt.Errorf("graph get vertex, status: %s", graph.State)
}
//根据顶点hash值取模,组装各个worker需要的顶点数组
vertexArr := make([][]string, len(graph.Workers))
for _, vertexId := range vertexIds {
workerIdx := common.HashBKDR(vertexId) % len(graph.Workers)
vertexArr[workerIdx] = append(vertexArr[workerIdx], vertexId)
}
//遍历各个worker,获取对应点信息
ch := make(chan VerticesGoResponse)
for i := 0; i < len(graph.Workers); i++ {
if vertexArr[i] != nil {
go func(workerName string, arr []string) {
vr := VerticesGoResponse{}
workerClient := workerMgr.GetWorker(workerName)
getVertexResp, err := workerClient.Session.GetVertex(context.Background(),
&pb.GetVertexReq{SpaceName: gb.Cred.Space(), GraphName: graphName, VertexId: arr})
vr.err = err
vr.Vertices = make([]Vertex, 0, len(getVertexResp.GetVerts()))
for _, vertexInfo := range getVertexResp.GetVerts() {
vr.Vertices = append(vr.Vertices, Vertex{
ID: vertexInfo.ID,
Property: vertexInfo.Property,
})
}
ch <- vr
}(graph.Workers[i].Name, vertexArr[i])
}
}
//聚合点信息
for i := 0; i < len(graph.Workers); i++ {
if vertexArr[i] != nil {
r := <-ch
vertices = append(vertices, r.Vertices...)
if r.err != nil {
err = r.err
}
}
}
if err != nil {
return nil, fmt.Errorf("graph get vertex, error: %w", err)
}
sort.Slice(vertices, func(i, j int) bool {
return vertices[i].ID < vertices[j].ID
})
return vertices, nil
}
// SaveIdle
// Save idle graphs to disk and unload them from RAM.
// The `graphs` parameter serves as a whitelist of graph names that will be exempted.
func (gb *GraphBl) SaveIdle(graphs ...string) {
var empty = struct{}{}
gset := make(map[string]any, len(graphs))
for _, e := range graphs {
gset[gb.Cred.Space()+"/"+e] = empty
}
groups := make(map[string]struct{})
for _, graphName := range graphs {
graph := graphMgr.GetGraphByName(gb.Cred.Space(), graphName)
if graph != nil {
groups[graph.WorkerGroup] = struct{}{}
}
}
for group := range groups {
for _, graph := range graphMgr.GetGraphsLoadedByGroup(group) {
if gset[graph.SpaceName+"/"+graph.Name] != nil {
continue
}
if graph.IsUsing() {
continue
}
_, success := GraphPersistenceTask.Operate(graph.SpaceName, graph.Name, Save)
if !success {
logrus.Errorf("autoSave space:%v garph %v failed", graph.SpaceName, graph.Name)
//return false, fmt.Errorf("failed to execute autoSave with graph: %s/%s", graph.SpaceName, graph.Name)
} else {
graph.OnDisk = true
//graph.SetState(structure.GraphStateOnDisk)
graphMgr.ForceState(graph, structure.GraphStateOnDisk)
logrus.Infof("autoSave space:%v graph:%v success", graph.SpaceName, graph.Name)
}
}
}
}
func (gb *GraphBl) Merge(graph *structure.VermeerGraph, params map[string]string) {
}
func (gb *GraphBl) checkOp(operation string) error {
if !gb.Cred.IsAdmin() {
return fmt.Errorf("permission required for this operation: %s", operation)
}
return nil
}
// HideParams 隐藏参数
func HideParams(graph *structure.VermeerGraph) *structure.VermeerGraph {
newGraph := structure.VermeerGraph{}
newGraph = *graph
newGraph.Params = nil
return &newGraph
}