func()

in vermeer/apps/structure/vertex.go [501:640]


func (vm *VertexInDB) BuildVertexMap() {
	var err error
	config := bigcache.Config{
		Shards:             1024,
		LifeWindow:         10 * time.Minute,
		CleanWindow:        0,
		MaxEntriesInWindow: 1000 * 10 * 60,
		MaxEntrySize:       500,
		Verbose:            false,
		HardMaxCacheSize:   cacheSize / 1024 / 1024,
		OnRemove:           nil,
		OnRemoveWithReason: nil,
	}
	vm.mapCache, err = bigcache.New(context.Background(), config)
	if err != nil {
		logrus.Errorf("create vertex map cache error:%v", err)
		return
	}
	id := 0
	for i, worker := range vm.workers {
		if worker.IsSelf {
			id = i
			break
		}
	}
	// sizePerStore := uint64(cacheSize) / uint64(len(vm.sliceStores))
	// wg := &sync.WaitGroup{}
	// vm.mapStores = make([]storage.Store, len(vm.sliceStores))
	// for i, store := range vm.sliceStores {
	// wg.Add(1)
	// go func(id int, sliceStore storage.Store) {
	// defer wg.Done()

	// err = sliceStore.FlushDB()
	// if err != nil {
	// 	logrus.Errorf("flush vertex slice store error:%v", err)
	// 	return
	// }
	// isSelf := vm.workers[id].IsSelf

	// make map store
	// vm.mapStores[id], err = storage.StoreMaker(storage.StoreOption{
	// 	StoreName: storage.StoreTypePebble,
	// 	Path:      path.Join(vm.dataDir, fmt.Sprintf("%v_vertex_map", id)),
	// 	Fsync:     false,
	// 	ReadOnly:  false,
	// 	UseFilter: false,
	// })
	// if err != nil {
	// 	logrus.Errorf("create vertex map store error:%v", err)
	// 	return
	// }
	// make batch
	batch := vm.mapStores[id].NewBatch()

	// get new iterator
	iter := vm.sliceStores[id].NewIterator()
	if iter == nil {
		logrus.Errorf("create vertex map store iterator error:%v", err)
		return
	}
	var size uint64
	for iter.First(); iter.Valid(); iter.Next() {
		// get vertex
		key := make([]byte, len(iter.Key()))
		copy(key, iter.Key())
		value := make([]byte, len(iter.Value()))
		copy(value, iter.Value())

		vID := serialize.SUint32(0)
		_, err = vID.Unmarshal(key)
		if err != nil {
			logrus.Errorf("unmarshal vertex error:%v", err)
			continue
		}
		// if isSelf {
		vID += serialize.SUint32(vm.workers[id].VertIdStart)
		// }
		bytes := make([]byte, vID.PredictSize())
		_, err = vID.Marshal(bytes)
		if err != nil {
			logrus.Errorf("marshal vertex error:%v", err)
			continue
		}

		if size < uint64(cacheSize) {
			size += uint64(len(value) + len(bytes))
			// set map cache
			err = vm.mapCache.Set(string(value), bytes)
			if err != nil {
				logrus.Errorf("set vertex map cache, longID:%v ,error:%v", string(value), err)
				continue
			}
		}
		// set map store
		err = batch.Set(value, bytes)
		if err != nil {
			logrus.Errorf("set vertex map store, longID:%v ,error:%v", string(value), err)
			continue
		}
		// commit full batch
		if batch.BatchFull() {
			err = batch.Commit()
			if err != nil {
				logrus.Errorf("commit vertex map store error:%v", err)
			}
			batch = vm.mapStores[id].NewBatch()
		}
	}
	err = iter.Close()
	if err != nil {
		logrus.Errorf("close vertex map store iterator error:%v", err)
	}
	err = batch.Commit()
	if err != nil {
		logrus.Errorf("commit vertex map store error:%v", err)
	}
	// logrus.Infof("build vertex map store:%v done", id)
	// }(i, store)
	// }
	// wg.Wait()

	logrus.Infof("map cache entry count:%v", vm.mapCache.Len())
	// vm.mapCache.ResetStatistics()

	// compact all map store
	wg := &sync.WaitGroup{}
	for i := range vm.mapStores {
		wg.Add(1)
		go func(idx int) {
			defer wg.Done()
			err := vm.mapStores[idx].Compact()
			if err != nil {
				logrus.Errorf("compact vertex map store error:%v", err)
			}
			logrus.Infof("compact vertex map store:%v done", idx)
		}(i)
	}
	wg.Wait()
}