vermeer/apps/structure/graph_manager.go (413 lines of code) (raw):
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with this
work for additional information regarding copyright ownership. The ASF
licenses this file to You under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and limitations
under the License.
*/
package structure
import (
"encoding/json"
"errors"
"fmt"
"os"
"path"
"sync"
"vermeer/apps/common"
storage "vermeer/apps/storage"
"github.com/sirupsen/logrus"
)
var GraphManager = &graphManager{}
type graphManager struct {
MutexLocker
Syncer
graphSpaceMap map[string]*GraphMap
store storage.Store
delimiter string
}
type GraphMap struct {
graphs map[string]*VermeerGraph
sync.Mutex
}
func (gm *graphManager) Init() {
gm.delimiter = ":"
if gm.graphSpaceMap != nil {
for _, graphMap := range gm.graphSpaceMap {
for _, v := range graphMap.graphs {
v.FreeMem()
}
}
}
gm.graphSpaceMap = make(map[string]*GraphMap)
}
// AddSpaceGraph Create a new Graph and add it to this manager.
func (gm *graphManager) AddSpaceGraph(spaceName string, graphName string) (*VermeerGraph, error) {
if spaceName == "" {
return nil, errors.New("invalid spaceName")
}
if graphName == "" {
return nil, errors.New("invalid graphName")
}
g := gm.CreateGraph(spaceName, graphName)
if err := gm.AddGraph(g); err != nil {
return nil, err
}
return g, nil
}
func (gm *graphManager) AddGraph(g *VermeerGraph) error {
defer gm.Unlock(gm.Lock())
graphMap := gm.graphSpaceMap[g.SpaceName]
if graphMap == nil {
graphMap = &GraphMap{graphs: make(map[string]*VermeerGraph)}
gm.graphSpaceMap[g.SpaceName] = graphMap
}
if _, ok := graphMap.graphs[g.Name]; ok {
return fmt.Errorf("graph name exists: %s/%s", g.SpaceName, g.Name)
}
common.PrometheusMetrics.GraphCnt.WithLabelValues().Inc()
graphMap.graphs[g.Name] = g
return nil
}
func (gm *graphManager) CreateGraph(spaceName string, graphName string) *VermeerGraph {
g := VermeerGraph{}
g.Init()
g.Name = graphName
g.SpaceName = spaceName
return &g
}
func (gm *graphManager) GetAllGraphs() []*VermeerGraph {
graphs := make([]*VermeerGraph, 0, 16)
for _, space := range gm.graphSpaceMap {
for _, g := range space.graphs {
graphs = append(graphs, g)
}
}
return graphs
}
func (gm *graphManager) GetGraphs(spaceName string) []*VermeerGraph {
graphs := make([]*VermeerGraph, 0)
if gm.graphSpaceMap[spaceName] == nil {
return graphs
}
gm.graphSpaceMap[spaceName].Lock()
defer gm.graphSpaceMap[spaceName].Unlock()
if gm.graphSpaceMap[spaceName] == nil {
return graphs
}
for _, v := range gm.graphSpaceMap[spaceName].graphs {
graphs = append(graphs, v)
}
return graphs
}
func (gm *graphManager) GetGraphsLoaded(spaceName string) []*VermeerGraph {
graphs := make([]*VermeerGraph, 0)
if gm.graphSpaceMap[spaceName] == nil {
return graphs
}
gm.graphSpaceMap[spaceName].Lock()
defer gm.graphSpaceMap[spaceName].Unlock()
for _, v := range gm.graphSpaceMap[spaceName].graphs {
if v.State == GraphStateLoaded {
graphs = append(graphs, v)
}
}
return graphs
}
func (gm *graphManager) GetGraphsLoadedByGroup(workerGroup string) []*VermeerGraph {
graphs := make([]*VermeerGraph, 0)
for _, space := range gm.graphSpaceMap {
if space == nil {
continue
}
space.Lock()
for _, g := range space.graphs {
if g.State == GraphStateLoaded && g.WorkerGroup == workerGroup {
graphs = append(graphs, g)
}
}
space.Unlock()
}
return graphs
}
func (gm *graphManager) GetGraphByName(spaceName string, graphName string) *VermeerGraph {
graphMap := gm.graphSpaceMap[spaceName]
if graphMap == nil {
return nil
}
graphMap.Lock()
defer graphMap.Unlock()
if graph, ok := graphMap.graphs[graphName]; ok {
return graph
}
return nil
}
func (gm *graphManager) DeleteGraph(spaceName string, graphName string) {
graphMap := gm.graphSpaceMap[spaceName]
if graphMap == nil {
return
}
graphMap.Lock()
defer graphMap.Unlock()
if graph, ok := graphMap.graphs[graphName]; ok {
graph.FreeMem()
delete(graphMap.graphs, graphName)
common.PrometheusMetrics.GraphCnt.WithLabelValues().Dec()
if graph.State == GraphStateLoaded {
common.PrometheusMetrics.GraphLoadedCnt.WithLabelValues().Dec()
}
common.PrometheusMetrics.VertexCnt.DeleteLabelValues(graphName)
common.PrometheusMetrics.EdgeCnt.DeleteLabelValues(graphName)
logrus.Infof("graph deleted: %s", graphName)
} else {
logrus.Errorf("graph is not exists:%v", graphName)
}
}
func (gm *graphManager) DeleteGraphFile(spaceName string, graphName string) {
if gm.graphSpaceMap[spaceName] == nil {
return
}
gm.graphSpaceMap[spaceName].Lock()
defer gm.graphSpaceMap[spaceName].Unlock()
p, err := common.GetCurrentPath()
if err != nil {
logrus.Errorf("DeleteGraph get current path error:%v", err)
}
dir := path.Join(p, "vermeer_data", "graph_data", spaceName, graphName)
if !common.IsFileOrDirExist(dir) {
logrus.Errorf("graph dir not exist,maybe graph has been deleted")
}
err = os.RemoveAll(dir)
if err != nil {
logrus.Errorf("DeleteGraph remove data error:%v", err)
}
}
func (gm *graphManager) GetGraphsByWorker(workerName string) []*VermeerGraph {
graphs := make([]*VermeerGraph, 0)
for _, graphMap := range gm.graphSpaceMap {
for _, graph := range graphMap.graphs {
for _, worker := range graph.Workers {
if worker.Name == workerName {
graphs = append(graphs, graph)
break
}
}
}
}
return graphs
}
// SaveGraph 图数据落盘并释放内存
func (gm *graphManager) SaveGraph(spaceName string, graphName string, workerName string) error {
gm.graphSpaceMap[spaceName].Lock()
defer gm.graphSpaceMap[spaceName].Unlock()
if graph, ok := gm.graphSpaceMap[spaceName].graphs[graphName]; ok {
err := graph.Save(workerName)
if err != nil {
return err
}
graph.OnDisk = true
graph.State = GraphStateOnDisk
graph.FreeMem()
gm.locker.Lock()
delete(gm.graphSpaceMap[spaceName].graphs, graphName)
gm.locker.Unlock()
} else {
logrus.Errorf("graph is not exists:%v", graphName)
return errors.New("graph is not exists")
}
return nil
}
// WriteDisk 图数据落盘,但不删除内存中的图
func (gm *graphManager) WriteDisk(spaceName string, graphName string, workerName string) error {
gm.graphSpaceMap[spaceName].Lock()
defer gm.graphSpaceMap[spaceName].Unlock()
if graph, ok := gm.graphSpaceMap[spaceName].graphs[graphName]; ok {
err := graph.Save(workerName)
if err != nil {
return err
}
graph.OnDisk = true
} else {
logrus.Errorf("graph is not exists:%v", graphName)
return errors.New("graph is not exists")
}
return nil
}
// ReadGraph 从磁盘中读取图数据并恢复
func (gm *graphManager) ReadGraph(spaceName string, graphName string, workerName string) error {
gm.locker.Lock()
if _, ok := gm.graphSpaceMap[spaceName]; !ok {
gm.graphSpaceMap[spaceName] = &GraphMap{graphs: make(map[string]*VermeerGraph)}
}
gm.locker.Unlock()
gm.graphSpaceMap[spaceName].Lock()
defer gm.graphSpaceMap[spaceName].Unlock()
if _, ok := gm.graphSpaceMap[spaceName].graphs[graphName]; ok {
logrus.Errorf("graph %v exists, no need read", graphName)
return fmt.Errorf("graph %v exists, no need read", graphName)
}
graph := new(VermeerGraph)
graph.SpaceName = spaceName
graph.Name = graphName
_, err := graph.SetDataDir(spaceName, graphName, workerName)
if err != nil {
logrus.Errorf("graph %v set data dir error,err:%v", graphName, err)
return fmt.Errorf("graph %v set data dir error,err:%w", graphName, err)
}
err = graph.Read(workerName)
if err != nil {
logrus.Errorf("graph %v read error,err:%v", graphName, err.Error())
return fmt.Errorf("graph %v load error,err:%v", graphName, err.Error())
}
graph.State = GraphStateLoaded
err = gm.AddGraph(graph)
if err != nil {
logrus.Errorf("add graph %v error,err:%v", graphName, err.Error())
return fmt.Errorf("add graph %v error,err:%v", graphName, err.Error())
}
return nil
}
// InitStore master保存图信息 初始化DB
func (gm *graphManager) InitStore() error {
p, err := common.GetCurrentPath()
if err != nil {
logrus.Errorf("get current path error:%v", err)
return err
}
dir := path.Join(p, "vermeer_data", "graph_info")
gm.store, err = storage.StoreMaker(storage.StoreOption{
StoreName: storage.StoreTypePebble,
Path: dir,
Fsync: true,
})
if err != nil {
return err
}
return nil
}
// SaveInfo master保存图信息 存储单个图信息
func (gm *graphManager) SaveInfo(spaceName string, graphName string) error {
gm.locker.Lock()
defer gm.locker.Unlock()
spaceMap := gm.graphSpaceMap[spaceName]
if spaceMap == nil {
return fmt.Errorf("failed to save graph info because space '%s' does not exist", spaceName)
}
graph := spaceMap.graphs[graphName]
if graph == nil {
return fmt.Errorf("failed to save graph info because graph '%s/%s' does not exist", spaceName, graphName)
}
return gm.doSaveInfo(graph)
}
func (gm *graphManager) ForceState(graph *VermeerGraph, state GraphState) bool {
if graph == nil {
logrus.Error("GraphManager.ForceState: the argument `graph` is nil")
return false
}
if state == "" {
logrus.Error("GraphManager.ForceState: the argument `state` is empty")
return false
}
defer gm.Unlock(gm.Lock())
graph.SetState(state)
if err := gm.doSaveInfo(graph); err != nil {
logrus.Errorf("failed to save graph '%s/%s' state '%s', caused by: %v", graph.SpaceName, graph.Name, state, err)
return false
}
return true
}
func (gm *graphManager) SetState(graph *VermeerGraph, state GraphState) error {
if graph == nil {
return fmt.Errorf("the argument `graph` is nil")
}
if state == "" {
return fmt.Errorf("the argument `state` is empty")
}
defer gm.Unlock(gm.Lock())
prevState := graph.State
prevTime := graph.UpdateTime
graph.SetState(state)
if err := gm.doSaveInfo(graph); err != nil {
graph.State = prevState
graph.UpdateTime = prevTime
logrus.Errorf("failed to save graph '%s/%s' state '%s', caused by: %v", graph.SpaceName, graph.Name, state, err)
return err
}
return nil
}
func (gm *graphManager) SetError(graph *VermeerGraph) bool {
if graph == nil {
logrus.Errorf("GraphManager.SetError: the argument `graph` is nil")
return false
}
defer gm.Unlock(gm.Lock())
graph.SetState(GraphStateError)
if err := gm.doSaveInfo(graph); err != nil {
logrus.Errorf("failed to save graph '%s/%s' error state, caused by: %v", graph.SpaceName, graph.Name, err)
return false
}
return true
}
func (gm *graphManager) SaveWorkerGroup(graph *VermeerGraph, workerGroup string) error {
if graph == nil {
return errors.New("the argument `graph` is nil")
}
if workerGroup == "" {
return errors.New("the argument `workerGroup` is empty")
}
gm.locker.Lock()
defer gm.locker.Unlock()
prevGroup := graph.WorkerGroup
graph.WorkerGroup = workerGroup
if err := gm.doSaveInfo(graph); err != nil {
graph.WorkerGroup = prevGroup
return err
}
return nil
}
func (gm *graphManager) doSaveInfo(graph *VermeerGraph) error {
bytes, err := json.Marshal(graph)
if err != nil {
return err
}
batch := gm.store.NewBatch()
err = batch.Set([]byte(graph.SpaceName+gm.delimiter+graph.Name), bytes)
if err != nil {
return err
}
err = batch.Commit()
if err != nil {
return err
}
logrus.Infof("save graph info successfully: %v/%v", graph.SpaceName, graph.Name)
return nil
}
// ReadInfo master从DB中恢复指定图信息
func (gm *graphManager) ReadInfo(spaceName string, graphName string) (*VermeerGraph, error) {
gm.locker.Lock()
defer gm.locker.Unlock()
graphBytes, err := gm.store.Get([]byte(spaceName + gm.delimiter + graphName))
if err != nil {
return nil, err
}
graph := &VermeerGraph{}
err = json.Unmarshal(graphBytes, graph)
if err != nil {
return nil, err
}
logrus.Infof("load graph info success:%v:%v", spaceName, graphName)
return graph, nil
}
// ReadAllInfo master从DB中恢复所有图空间名与图名
func (gm *graphManager) ReadAllInfo() ([]*VermeerGraph, error) {
gm.locker.Lock()
defer gm.locker.Unlock()
graphs := make([]*VermeerGraph, 0)
scan := gm.store.Scan()
for kv := range scan {
graph := &VermeerGraph{}
err := json.Unmarshal(kv.Value, graph)
if err != nil {
return nil, err
}
graphs = append(graphs, graph)
}
return graphs, nil
}
// DeleteInfo master从DB中删除指定图信息
func (gm *graphManager) DeleteInfo(spaceName string, graphName string) {
gm.locker.Lock()
defer gm.locker.Unlock()
batch := gm.store.NewBatch()
err := batch.Delete([]byte(spaceName + gm.delimiter + graphName))
if err != nil {
logrus.Errorf("delete space %v graph %v info error:%v", spaceName, graphName, err)
}
err = batch.Commit()
if err != nil {
logrus.Errorf("commit delete space %v graph %v info error:%v", spaceName, graphName, err)
}
}