vermeer/apps/structure/vertex.go (628 lines of code) (raw):
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with this
work for additional information regarding copyright ownership. The ASF
licenses this file to You under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and limitations
under the License.
*/
package structure
import (
"context"
"encoding/binary"
"encoding/json"
"fmt"
"os"
"path"
"sync"
"time"
"unsafe"
"vermeer/apps/common"
"vermeer/apps/serialize"
"vermeer/apps/storage"
"github.com/allegro/bigcache/v3"
"github.com/sirupsen/logrus"
)
type Vertex struct {
ID string
}
func (v *Vertex) Marshal(buffer []byte) (int, error) {
offset := 0
binary.BigEndian.PutUint16(buffer, uint16(len(v.ID)))
offset += 2
copy(buffer[2:], v.ID)
offset += len(v.ID)
return offset, nil
}
func (v *Vertex) Unmarshal(buffer []byte) (int, error) {
offset := 0
size := binary.BigEndian.Uint16(buffer)
offset += 2
b := make([]byte, size)
copy(b, buffer[2:])
v.ID = *(*string)(unsafe.Pointer(&b))
offset += int(size)
return offset, nil
}
func (v *Vertex) ToString() string {
return ""
}
func (v *Vertex) PredictSize() int {
return 2 + len(v.ID)
}
type SliceVertex []Vertex
func (sv *SliceVertex) Marshal(buffer []byte) (int, error) {
offset := 0
binary.BigEndian.PutUint32(buffer, uint32(len(*sv)))
offset += 4
for _, v := range *sv {
n, err := v.Marshal(buffer[offset:])
if err != nil {
return 0, err
}
offset += n
}
return offset, nil
}
func (sv *SliceVertex) Unmarshal(buffer []byte) (int, error) {
offset := 0
length := binary.BigEndian.Uint32(buffer)
offset += 4
*sv = make([]Vertex, length)
for i := range *sv {
var v Vertex
n, err := v.Unmarshal(buffer[offset:])
if err != nil {
return 0, err
}
(*sv)[i] = v
offset += n
}
return offset, nil
}
func (sv *SliceVertex) ToString() string {
return ""
}
func (sv *SliceVertex) PredictSize() int {
size := 4
for _, v := range *sv {
size += v.PredictSize()
}
return size
}
func (sv *SliceVertex) Partition(limit int) []serialize.SlicePartition {
p := make([]serialize.SlicePartition, 0)
length := len(*sv)
index := 0
size := 4
for i, v := range *sv {
size += v.PredictSize()
if size >= limit {
var one serialize.SlicePartition
one.Start = index
one.Size = size
one.End = i + 1
p = append(p, one)
index = i + 1
size = 4
}
if i == length-1 && index < length {
var lastOne serialize.SlicePartition
lastOne.Size = size
lastOne.Start = index
lastOne.End = length
p = append(p, lastOne)
}
}
return p
}
// Partition 不可用 因为[]Vertex不可以转换成[]serialize.MarshalAble
func Partition(s []serialize.MarshalAble, limit int) []serialize.SlicePartition {
p := make([]serialize.SlicePartition, 0)
length := len(s)
index := 0
size := 4
for i, v := range s {
size += v.PredictSize()
if size >= limit {
var one serialize.SlicePartition
one.Start = index
one.Size = size
one.End = i
p = append(p, one)
index = i + 1
size = 4
}
if i == length-1 && index < length {
var lastOne serialize.SlicePartition
lastOne.Size = size
lastOne.Start = index
lastOne.End = length - 1
p = append(p, lastOne)
}
}
return p
}
type VertexMem struct {
TotalVertex []Vertex
VertexLongIDMap map[string]uint32
}
func (vm *VertexMem) Init(dataDir string) {
vm.TotalVertex = make([]Vertex, 0)
_ = dataDir
}
func (vm *VertexMem) TotalVertexCount() uint32 {
return uint32(len(vm.TotalVertex))
}
func (vm *VertexMem) GetVertex(vertexID uint32) Vertex {
return vm.TotalVertex[vertexID]
}
func (vm *VertexMem) GetVertexIndex(vertex string) (uint32, bool) {
value, ok := vm.VertexLongIDMap[vertex]
return value, ok
}
func (vm *VertexMem) AppendVertices(vertex ...Vertex) {
vm.TotalVertex = append(vm.TotalVertex, vertex...)
}
func (vm *VertexMem) SetVertex(vertexID uint32, vertex Vertex) {
vm.TotalVertex[vertexID] = vertex
}
func (vm *VertexMem) SetVertices(offset uint32, vertex ...Vertex) {
vID := serialize.SUint32(offset)
for _, v := range vertex {
vm.TotalVertex[vID] = v
vID++
}
}
func (vm *VertexMem) RecastVertex(totalCount int64, vertStart uint32, workers []*GraphWorker) {
if len(workers) > 1 {
oldVerts := vm.TotalVertex
vm.TotalVertex = make([]Vertex, totalCount)
logrus.Infof("recast make total vertex complete")
for i := range oldVerts {
vm.TotalVertex[vertStart+uint32(i)] = oldVerts[i]
}
}
}
func (vm *VertexMem) BuildVertexMap() {
vm.VertexLongIDMap = make(map[string]uint32, len(vm.TotalVertex))
for i, v := range vm.TotalVertex {
vm.VertexLongIDMap[v.ID] = uint32(i)
}
}
func (vm *VertexMem) deleteData() {}
func (vm *VertexMem) freeMem() {}
var cacheSize = 10 * 1024 * 1024 * 1024
type VertexInDB struct {
sliceStores []storage.Store
mapStores []storage.Store
mapCache *bigcache.BigCache
workers []*GraphWorker
totalVertexCount uint32
idSeed int32
dataDir string
}
func (vm *VertexInDB) Init(dataDir string) {
var err error
vm.dataDir = vm.makeDataDir(dataDir)
vm.sliceStores = make([]storage.Store, 1)
vm.sliceStores[0], err = storage.StoreMaker(storage.StoreOption{
StoreName: storage.StoreTypePebble,
Path: path.Join(vm.dataDir, "self_vertex_slice"),
Fsync: false,
ReadOnly: false,
UseFilter: false,
})
if err != nil {
logrus.Errorf("init vertex store error:%v", err)
return
}
}
func (vm *VertexInDB) makeDataDir(dataDir string) string {
return path.Join(dataDir, "graph_db")
}
func (vm *VertexInDB) TotalVertexCount() uint32 {
return vm.totalVertexCount
}
func (vm *VertexInDB) vertexStoreID(vertexID uint32) int {
if len(vm.workers) == 1 {
return 0
}
for i, worker := range vm.workers {
if vertexID >= worker.VertIdStart && vertexID < worker.VertIdStart+worker.VertexCount {
return i
}
}
return 0
}
func (vm *VertexInDB) longVertexStoreID(vertex string) int {
if len(vm.workers) == 1 {
return 0
}
return common.HashBKDR(vertex) % len(vm.workers)
}
func (vm *VertexInDB) GetVertex(vertexID uint32) Vertex {
storeID := vm.vertexStoreID(vertexID)
vID := serialize.SUint32(vertexID)
if vID > serialize.SUint32(vm.totalVertexCount) {
logrus.Errorf("vertex id:%d is out of range", vertexID)
return Vertex{}
}
if vm.workers[storeID].IsSelf {
vID -= serialize.SUint32(vm.workers[storeID].VertIdStart)
}
bytes := make([]byte, vID.PredictSize())
_, err := vID.Marshal(bytes)
if err != nil {
logrus.Errorf("marshal vertex error:%v", err)
return Vertex{}
}
longID, err := vm.sliceStores[storeID].Get(bytes)
if err != nil {
logrus.Errorf("get vertex error:%v", err)
return Vertex{}
}
return Vertex{ID: string(longID)}
}
func (vm *VertexInDB) GetVertexIndex(vertex string) (uint32, bool) {
storeID := vm.longVertexStoreID(vertex)
start := vm.workers[storeID].VertIdStart
end := start + vm.workers[storeID].VertexCount
if vm.mapCache != nil {
// vByte := []byte(vertex)
if sID, err := vm.mapCache.Get(vertex); err == nil {
vID := serialize.SUint32(0)
_, err = vID.Unmarshal(sID)
if err != nil {
logrus.Errorf("unmarshal vertex error:%v", err)
return 0, false
}
if vID < serialize.SUint32(start) || vID >= serialize.SUint32(end) {
logrus.Warnf("get vertex index from cache is out of range, read from storage. vertex:%v vid:%v , start:%v, end:%v", vertex, vID, start, end)
} else {
return uint32(vID), true
}
}
}
// storeID := vm.longVertexStoreID(vertex)
sID, err := vm.mapStores[storeID].Get([]byte(vertex))
if err != nil {
// logrus.Errorf("get vertex error:%v", err)
return 0, false
}
vID := serialize.SUint32(0)
_, err = vID.Unmarshal(sID)
if err != nil {
logrus.Errorf("unmarshal vertex error:%v", err)
return 0, false
}
if vm.mapCache != nil {
err = vm.mapCache.Set(vertex, sID)
if err != nil {
logrus.Errorf("set vertex cache error:%v", err)
}
}
if vID < serialize.SUint32(start) || vID >= serialize.SUint32(end) {
logrus.Errorf("get vertex index from db:%v vertex:%v vid:%v is out of range, start:%v, end:%v", vm.mapStores[storeID].Path(), vertex, vID, start, end)
return 0, false
}
return uint32(vID), true
}
func (vm *VertexInDB) AppendVertices(vertex ...Vertex) {
batch := vm.sliceStores[0].NewBatch()
vertexCopy := make([]Vertex, len(vertex))
copy(vertexCopy, vertex)
for _, v := range vertexCopy {
vID := serialize.SUint32(vm.idSeed)
vm.idSeed++
bytes := make([]byte, vID.PredictSize())
_, err := vID.Marshal(bytes)
if err != nil {
logrus.Errorf("marshal vertex error:%v", err)
continue
}
err = batch.Set(bytes, []byte(v.ID))
if err != nil {
logrus.Errorf("set vertex error:%v", err)
continue
}
}
err := batch.Commit()
if err != nil {
logrus.Errorf("commit vertex error:%v", err)
}
// update vertex count
vm.totalVertexCount = uint32(vm.idSeed)
}
func (vm *VertexInDB) SetVertex(vertexID uint32, vertex Vertex) {
vID := serialize.SUint32(vertexID)
bytes := make([]byte, vID.PredictSize())
_, err := vID.Marshal(bytes)
if err != nil {
logrus.Errorf("marshal vertex error:%v", err)
return
}
err = vm.sliceStores[vm.vertexStoreID(vertexID)].Set(bytes, []byte(vertex.ID))
if err != nil {
logrus.Errorf("set vertex error:%v", err)
return
}
}
func (vm *VertexInDB) SetVertices(offset uint32, vertex ...Vertex) {
vID := serialize.SUint32(offset)
storeID := vm.vertexStoreID(uint32(vID))
sliceBatch := vm.sliceStores[storeID].NewBatch()
mapBatch := vm.mapStores[storeID].NewBatch()
vertexCopy := make([]Vertex, len(vertex))
copy(vertexCopy, vertex)
for _, v := range vertexCopy {
bytes := make([]byte, vID.PredictSize())
_, err := vID.Marshal(bytes)
vID++
if err != nil {
logrus.Errorf("marshal vertex error:%v", err)
continue
}
err = sliceBatch.Set(bytes, []byte(v.ID))
if err != nil {
logrus.Errorf("set vertex error:%v", err)
continue
}
err = mapBatch.Set([]byte(v.ID), bytes)
if err != nil {
logrus.Errorf("set vertex map error:%v", err)
continue
}
}
err := sliceBatch.Commit()
if err != nil {
logrus.Errorf("commit vertex error:%v", err)
}
err = mapBatch.Commit()
if err != nil {
logrus.Errorf("commit vertex map error:%v", err)
}
}
func (vm *VertexInDB) RecastVertex(totalCount int64, vertStart uint32, workers []*GraphWorker) {
_ = totalCount
_ = vertStart
vm.workers = workers
selfStore := vm.sliceStores[0]
vm.sliceStores = make([]storage.Store, len(vm.workers))
var err error
for i := range vm.sliceStores {
if workers[i].IsSelf {
vm.sliceStores[i] = selfStore
err := vm.sliceStores[i].Compact()
if err != nil {
logrus.Errorf("compact vertex slice store error:%v", err)
}
} else {
vm.sliceStores[i], err = storage.StoreMaker(storage.StoreOption{
StoreName: storage.StoreTypePebble,
Path: path.Join(vm.dataDir, fmt.Sprintf("%v_vertex_slice", i)),
Fsync: false,
ReadOnly: false,
UseFilter: false,
})
if err != nil {
logrus.Errorf("create vertex slice store error:%v", err)
continue
}
}
}
// sum vertex count
vm.totalVertexCount = 0
for _, w := range workers {
vm.totalVertexCount += w.VertexCount
}
// build vertex map
vm.mapStores = make([]storage.Store, len(vm.workers))
for id := range workers {
vm.mapStores[id], err = storage.StoreMaker(storage.StoreOption{
StoreName: storage.StoreTypePebble,
Path: path.Join(vm.dataDir, fmt.Sprintf("%v_vertex_map", id)),
Fsync: false,
ReadOnly: false,
UseFilter: false,
})
if err != nil {
logrus.Errorf("create vertex map store error:%v", err)
return
}
}
}
func (vm *VertexInDB) BuildVertexMap() {
var err error
config := bigcache.Config{
Shards: 1024,
LifeWindow: 10 * time.Minute,
CleanWindow: 0,
MaxEntriesInWindow: 1000 * 10 * 60,
MaxEntrySize: 500,
Verbose: false,
HardMaxCacheSize: cacheSize / 1024 / 1024,
OnRemove: nil,
OnRemoveWithReason: nil,
}
vm.mapCache, err = bigcache.New(context.Background(), config)
if err != nil {
logrus.Errorf("create vertex map cache error:%v", err)
return
}
id := 0
for i, worker := range vm.workers {
if worker.IsSelf {
id = i
break
}
}
// sizePerStore := uint64(cacheSize) / uint64(len(vm.sliceStores))
// wg := &sync.WaitGroup{}
// vm.mapStores = make([]storage.Store, len(vm.sliceStores))
// for i, store := range vm.sliceStores {
// wg.Add(1)
// go func(id int, sliceStore storage.Store) {
// defer wg.Done()
// err = sliceStore.FlushDB()
// if err != nil {
// logrus.Errorf("flush vertex slice store error:%v", err)
// return
// }
// isSelf := vm.workers[id].IsSelf
// make map store
// vm.mapStores[id], err = storage.StoreMaker(storage.StoreOption{
// StoreName: storage.StoreTypePebble,
// Path: path.Join(vm.dataDir, fmt.Sprintf("%v_vertex_map", id)),
// Fsync: false,
// ReadOnly: false,
// UseFilter: false,
// })
// if err != nil {
// logrus.Errorf("create vertex map store error:%v", err)
// return
// }
// make batch
batch := vm.mapStores[id].NewBatch()
// get new iterator
iter := vm.sliceStores[id].NewIterator()
if iter == nil {
logrus.Errorf("create vertex map store iterator error:%v", err)
return
}
var size uint64
for iter.First(); iter.Valid(); iter.Next() {
// get vertex
key := make([]byte, len(iter.Key()))
copy(key, iter.Key())
value := make([]byte, len(iter.Value()))
copy(value, iter.Value())
vID := serialize.SUint32(0)
_, err = vID.Unmarshal(key)
if err != nil {
logrus.Errorf("unmarshal vertex error:%v", err)
continue
}
// if isSelf {
vID += serialize.SUint32(vm.workers[id].VertIdStart)
// }
bytes := make([]byte, vID.PredictSize())
_, err = vID.Marshal(bytes)
if err != nil {
logrus.Errorf("marshal vertex error:%v", err)
continue
}
if size < uint64(cacheSize) {
size += uint64(len(value) + len(bytes))
// set map cache
err = vm.mapCache.Set(string(value), bytes)
if err != nil {
logrus.Errorf("set vertex map cache, longID:%v ,error:%v", string(value), err)
continue
}
}
// set map store
err = batch.Set(value, bytes)
if err != nil {
logrus.Errorf("set vertex map store, longID:%v ,error:%v", string(value), err)
continue
}
// commit full batch
if batch.BatchFull() {
err = batch.Commit()
if err != nil {
logrus.Errorf("commit vertex map store error:%v", err)
}
batch = vm.mapStores[id].NewBatch()
}
}
err = iter.Close()
if err != nil {
logrus.Errorf("close vertex map store iterator error:%v", err)
}
err = batch.Commit()
if err != nil {
logrus.Errorf("commit vertex map store error:%v", err)
}
// logrus.Infof("build vertex map store:%v done", id)
// }(i, store)
// }
// wg.Wait()
logrus.Infof("map cache entry count:%v", vm.mapCache.Len())
// vm.mapCache.ResetStatistics()
// compact all map store
wg := &sync.WaitGroup{}
for i := range vm.mapStores {
wg.Add(1)
go func(idx int) {
defer wg.Done()
err := vm.mapStores[idx].Compact()
if err != nil {
logrus.Errorf("compact vertex map store error:%v", err)
}
logrus.Infof("compact vertex map store:%v done", idx)
}(i)
}
wg.Wait()
}
type VertexInDBMeta struct {
Workers []*GraphWorker `json:"workers"`
TotalVertexCount uint32 `json:"total_vertex_count"`
DataDir string `json:"data_dir"`
}
func (vm *VertexInDB) save(graphMeta *GraphMeta, dir string, wg *sync.WaitGroup) {
if vm.mapCache != nil {
logrus.Infof("cache len:%v", vm.mapCache.Len())
logrus.Infof("hit rate:%.2f", float64(vm.mapCache.Stats().Hits)/float64(vm.mapCache.Stats().Hits+vm.mapCache.Stats().Misses))
vm.mapCache.Close()
vm.mapCache = nil
}
// 1. save graph and store meta
dataMeta := VertexInDBMeta{}
dataMeta.DataDir = vm.dataDir
dataMeta.TotalVertexCount = vm.totalVertexCount
dataMeta.Workers = vm.workers
bytes, err := json.Marshal(dataMeta)
if err != nil {
logrus.Errorf("marshal vertex meta error:%v", err)
}
err = os.WriteFile(path.Join(dir, "vertex_in_db__meta"), bytes, 0644)
if err != nil {
logrus.Errorf("write vertex meta error:%v", err)
}
// 2. close all store
for i, worker := range vm.workers {
wg.Add(1)
go func(id int, worker *GraphWorker) {
defer wg.Done()
defer logrus.Infof("save store:%v-%v done", vm.dataDir, id)
err := vm.sliceStores[id].FlushDB()
if err != nil {
logrus.Errorf("flush vertex slice store error:%v,store:%v", err, vm.dataDir)
}
err = vm.mapStores[id].FlushDB()
if err != nil {
logrus.Errorf("flush vertex map store error:%v,store:%v", err, vm.dataDir)
}
}(i, worker)
}
}
func (vm *VertexInDB) freeMem() {
for id := range vm.sliceStores {
err := vm.sliceStores[id].Close()
if err != nil {
logrus.Errorf("close vertex slice store error:%v,store:%v", err, vm.dataDir)
}
logrus.Infof("close vertex slice store:%v", vm.dataDir)
}
for id := range vm.mapStores {
err := vm.mapStores[id].Close()
if err != nil {
logrus.Errorf("close vertex map store error:%v,store:%v", err, vm.dataDir)
}
logrus.Infof("close vertex map store:%v", vm.dataDir)
}
}
func (vm *VertexInDB) load(meta GraphMeta, dir string, wg *sync.WaitGroup) {
// 1. load graph and store meta
bytes, err := os.ReadFile(path.Join(dir, "vertex_in_db__meta"))
if err != nil {
logrus.Errorf("read vertex meta error:%v,store:%v", err, vm.dataDir)
}
dataMeta := VertexInDBMeta{}
err = json.Unmarshal(bytes, &dataMeta)
if err != nil {
logrus.Errorf("unmarshal vertex meta error:%v,store:%v", err, vm.dataDir)
}
vm.dataDir = dataMeta.DataDir
vm.totalVertexCount = dataMeta.TotalVertexCount
vm.workers = dataMeta.Workers
// 2. open all store
vm.sliceStores = make([]storage.Store, len(vm.workers))
vm.mapStores = make([]storage.Store, len(vm.workers))
for i, worker := range vm.workers {
wg.Add(1)
go func(id int, worker *GraphWorker) {
defer wg.Done()
defer logrus.Infof("open store:%v-%v done", vm.dataDir, id)
if worker.IsSelf {
vm.sliceStores[id], err = storage.StoreMaker(storage.StoreOption{
StoreName: storage.StoreTypePebble,
Path: path.Join(vm.dataDir, "self_vertex_slice"),
Fsync: false,
ReadOnly: false,
UseFilter: false,
})
} else {
vm.sliceStores[id], err = storage.StoreMaker(storage.StoreOption{
StoreName: storage.StoreTypePebble,
Path: path.Join(vm.dataDir, fmt.Sprintf("%v_vertex_slice", id)),
Fsync: false,
ReadOnly: false,
UseFilter: false,
})
}
if err != nil {
logrus.Errorf("open vertex slice store error:%v,store:%v", err, vm.dataDir)
}
vm.mapStores[id], err = storage.StoreMaker(storage.StoreOption{
StoreName: storage.StoreTypePebble,
Path: path.Join(vm.dataDir, fmt.Sprintf("%v_vertex_map", id)),
Fsync: false,
ReadOnly: false,
UseFilter: false,
})
if err != nil {
logrus.Errorf("open vertex map store error:%v,store:%v", err, vm.dataDir)
}
}(i, worker)
}
}
func (vm *VertexInDB) deleteData() {
err := os.RemoveAll(vm.dataDir)
if err != nil {
logrus.Errorf("delete vertex data error:%v", err)
}
}