in vermeer/apps/structure/vertex.go [501:640]
func (vm *VertexInDB) BuildVertexMap() {
var err error
config := bigcache.Config{
Shards: 1024,
LifeWindow: 10 * time.Minute,
CleanWindow: 0,
MaxEntriesInWindow: 1000 * 10 * 60,
MaxEntrySize: 500,
Verbose: false,
HardMaxCacheSize: cacheSize / 1024 / 1024,
OnRemove: nil,
OnRemoveWithReason: nil,
}
vm.mapCache, err = bigcache.New(context.Background(), config)
if err != nil {
logrus.Errorf("create vertex map cache error:%v", err)
return
}
id := 0
for i, worker := range vm.workers {
if worker.IsSelf {
id = i
break
}
}
// sizePerStore := uint64(cacheSize) / uint64(len(vm.sliceStores))
// wg := &sync.WaitGroup{}
// vm.mapStores = make([]storage.Store, len(vm.sliceStores))
// for i, store := range vm.sliceStores {
// wg.Add(1)
// go func(id int, sliceStore storage.Store) {
// defer wg.Done()
// err = sliceStore.FlushDB()
// if err != nil {
// logrus.Errorf("flush vertex slice store error:%v", err)
// return
// }
// isSelf := vm.workers[id].IsSelf
// make map store
// vm.mapStores[id], err = storage.StoreMaker(storage.StoreOption{
// StoreName: storage.StoreTypePebble,
// Path: path.Join(vm.dataDir, fmt.Sprintf("%v_vertex_map", id)),
// Fsync: false,
// ReadOnly: false,
// UseFilter: false,
// })
// if err != nil {
// logrus.Errorf("create vertex map store error:%v", err)
// return
// }
// make batch
batch := vm.mapStores[id].NewBatch()
// get new iterator
iter := vm.sliceStores[id].NewIterator()
if iter == nil {
logrus.Errorf("create vertex map store iterator error:%v", err)
return
}
var size uint64
for iter.First(); iter.Valid(); iter.Next() {
// get vertex
key := make([]byte, len(iter.Key()))
copy(key, iter.Key())
value := make([]byte, len(iter.Value()))
copy(value, iter.Value())
vID := serialize.SUint32(0)
_, err = vID.Unmarshal(key)
if err != nil {
logrus.Errorf("unmarshal vertex error:%v", err)
continue
}
// if isSelf {
vID += serialize.SUint32(vm.workers[id].VertIdStart)
// }
bytes := make([]byte, vID.PredictSize())
_, err = vID.Marshal(bytes)
if err != nil {
logrus.Errorf("marshal vertex error:%v", err)
continue
}
if size < uint64(cacheSize) {
size += uint64(len(value) + len(bytes))
// set map cache
err = vm.mapCache.Set(string(value), bytes)
if err != nil {
logrus.Errorf("set vertex map cache, longID:%v ,error:%v", string(value), err)
continue
}
}
// set map store
err = batch.Set(value, bytes)
if err != nil {
logrus.Errorf("set vertex map store, longID:%v ,error:%v", string(value), err)
continue
}
// commit full batch
if batch.BatchFull() {
err = batch.Commit()
if err != nil {
logrus.Errorf("commit vertex map store error:%v", err)
}
batch = vm.mapStores[id].NewBatch()
}
}
err = iter.Close()
if err != nil {
logrus.Errorf("close vertex map store iterator error:%v", err)
}
err = batch.Commit()
if err != nil {
logrus.Errorf("commit vertex map store error:%v", err)
}
// logrus.Infof("build vertex map store:%v done", id)
// }(i, store)
// }
// wg.Wait()
logrus.Infof("map cache entry count:%v", vm.mapCache.Len())
// vm.mapCache.ResetStatistics()
// compact all map store
wg := &sync.WaitGroup{}
for i := range vm.mapStores {
wg.Add(1)
go func(idx int) {
defer wg.Done()
err := vm.mapStores[idx].Compact()
if err != nil {
logrus.Errorf("compact vertex map store error:%v", err)
}
logrus.Infof("compact vertex map store:%v done", idx)
}(i)
}
wg.Wait()
}