datasource/mongo/sd/instance_cache.go (127 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package sd
import (
"reflect"
"strings"
cmap "github.com/orcaman/concurrent-map"
"go.mongodb.org/mongo-driver/bson"
"github.com/apache/servicecomb-service-center/datasource"
"github.com/apache/servicecomb-service-center/datasource/mongo/model"
"github.com/apache/servicecomb-service-center/datasource/sdcommon"
)
type instanceStore struct {
dirty bool
// the key is documentID, is value is mongo document.
concurrentMap cmap.ConcurrentMap
// the key is generated by indexFuncs,the value is a set of documentID.
indexSets IndexCache
}
func init() {
RegisterCacher(instance, newInstanceStore)
InstIndexCols = NewIndexCols()
InstIndexCols.AddIndexFunc(InstServiceIDIndex)
}
func newInstanceStore() *MongoCacher {
options := DefaultOptions().SetTable(instance)
cache := &instanceStore{
dirty: false,
concurrentMap: cmap.New(),
indexSets: NewIndexCache(),
}
instanceUnmarshal := func(doc bson.Raw) (resource sdcommon.Resource) {
docID := MongoDocument{}
err := bson.Unmarshal(doc, &docID)
if err != nil {
return
}
inst := model.Instance{}
err = bson.Unmarshal(doc, &inst)
if err != nil {
return
}
resource.Value = inst
resource.Key = docID.ID.Hex()
return
}
return NewMongoCacher(options, cache, instanceUnmarshal)
}
func (s *instanceStore) Name() string {
return instance
}
func (s *instanceStore) Size() int {
return s.concurrentMap.Count()
}
func (s *instanceStore) Get(key string) interface{} {
if v, exist := s.concurrentMap.Get(key); exist {
return v
}
return nil
}
func (s *instanceStore) ForEach(iter func(k string, v interface{}) (next bool)) {
for k, v := range s.concurrentMap.Items() {
if !iter(k, v) {
break
}
}
}
func (s *instanceStore) GetValue(index string) []interface{} {
docs := s.indexSets.Get(index)
res := make([]interface{}, 0, len(docs))
for _, id := range docs {
if doc, exist := s.concurrentMap.Get(id); exist {
res = append(res, doc)
}
}
return res
}
func (s *instanceStore) Dirty() bool {
return s.dirty
}
func (s *instanceStore) MarkDirty() {
s.dirty = true
}
func (s *instanceStore) Clear() {
s.dirty = false
s.concurrentMap.Clear()
s.indexSets.Clear()
}
func (s *instanceStore) ProcessUpdate(event MongoEvent) {
instData, ok := event.Value.(model.Instance)
if !ok {
return
}
if instData.Instance == nil {
return
}
// set the document data.
s.concurrentMap.Set(event.DocumentID, event.Value)
for _, index := range InstIndexCols.GetIndexes(instData) {
// set the index sets.
s.indexSets.Put(index, event.DocumentID)
}
}
func (s *instanceStore) ProcessDelete(event MongoEvent) {
instanceData, ok := s.concurrentMap.Get(event.DocumentID)
if !ok {
return
}
instMongo := instanceData.(model.Instance)
if instMongo.Instance == nil {
return
}
s.concurrentMap.Remove(event.DocumentID)
for _, index := range InstIndexCols.GetIndexes(instanceData) {
s.indexSets.Delete(index, event.DocumentID)
}
}
func (s *instanceStore) isValueNotUpdated(value interface{}, newValue interface{}) bool {
newInst, ok := newValue.(model.Instance)
if !ok {
return true
}
oldInst, ok := value.(model.Instance)
if !ok {
return true
}
newInst.RefreshTime = oldInst.RefreshTime
return reflect.DeepEqual(newInst, oldInst)
}
func InstServiceIDIndex(data interface{}) string {
inst := data.(model.Instance)
return strings.Join([]string{inst.Domain, inst.Project, inst.Instance.ServiceId}, datasource.SPLIT)
}