pkg/plugins/resources/memory/store.go (299 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package memory import ( "context" "fmt" "strconv" "strings" "sync" "time" ) import ( "github.com/pkg/errors" ) import ( core_model "github.com/apache/dubbo-kubernetes/pkg/core/resources/model" "github.com/apache/dubbo-kubernetes/pkg/core/resources/registry" "github.com/apache/dubbo-kubernetes/pkg/core/resources/store" "github.com/apache/dubbo-kubernetes/pkg/events" ) type resourceKey struct { Name string Mesh string ResourceType string } type memoryStoreRecord struct { resourceKey Version memoryVersion Spec string CreationTime time.Time ModificationTime time.Time Children []*resourceKey Labels map[string]string } type memoryStoreRecords = []*memoryStoreRecord var _ core_model.ResourceMeta = &memoryMeta{} type memoryMeta struct { Name string Mesh string Version memoryVersion CreationTime time.Time ModificationTime time.Time Labels map[string]string } func (m memoryMeta) GetName() string { return m.Name } func (m memoryMeta) GetNameExtensions() core_model.ResourceNameExtensions { return core_model.ResourceNameExtensionsUnsupported } func (m memoryMeta) GetMesh() string { return m.Mesh } func (m memoryMeta) GetVersion() string { return m.Version.String() } func (m memoryMeta) GetCreationTime() time.Time { return m.CreationTime } func (m memoryMeta) GetModificationTime() time.Time { return m.ModificationTime } func (m memoryMeta) GetLabels() map[string]string { return m.Labels } type memoryVersion uint64 func initialVersion() memoryVersion { return memoryVersion(1) } func (v memoryVersion) Next() memoryVersion { return memoryVersion(uint64(v) + 1) } func (v memoryVersion) String() string { return strconv.FormatUint(uint64(v), 10) } var _ store.ResourceStore = &memoryStore{} type memoryStore struct { records memoryStoreRecords mu sync.RWMutex eventWriter events.Emitter } func NewStore() store.ResourceStore { return &memoryStore{} } func (c *memoryStore) SetEventWriter(writer events.Emitter) { c.mu.Lock() defer c.mu.Unlock() c.eventWriter = writer } func (c *memoryStore) Create(_ context.Context, r core_model.Resource, fs ...store.CreateOptionsFunc) error { c.mu.Lock() defer c.mu.Unlock() opts := store.NewCreateOptions(fs...) // Name must be provided via CreateOptions if opts.Name == "" && opts.Mesh == "" { return errors.New("you must pass store.CreateBy or store.CreateByKey as a parameter") } if _, record := c.findRecord(string(r.Descriptor().Name), opts.Name, opts.Mesh); record != nil { return store.ErrorResourceAlreadyExists(r.Descriptor().Name, opts.Name, opts.Mesh) } meta := memoryMeta{ Name: opts.Name, Mesh: opts.Mesh, Version: initialVersion(), CreationTime: opts.CreationTime, ModificationTime: opts.CreationTime, Labels: opts.Labels, } // fill the meta r.SetMeta(meta) // convert into storage representation record, err := c.marshalRecord( string(r.Descriptor().Name), meta, r.GetSpec()) if err != nil { return err } if opts.Owner != nil { _, ownerRecord := c.findRecord(string(opts.Owner.Descriptor().Name), opts.Owner.GetMeta().GetName(), opts.Owner.GetMeta().GetMesh()) if ownerRecord == nil { return store.ErrorResourceNotFound(opts.Owner.Descriptor().Name, opts.Owner.GetMeta().GetName(), opts.Owner.GetMeta().GetMesh()) } ownerRecord.Children = append(ownerRecord.Children, &record.resourceKey) } // persist c.records = append(c.records, record) if c.eventWriter != nil { go func() { c.eventWriter.Send(events.ResourceChangedEvent{ Operation: events.Create, Type: r.Descriptor().Name, Key: core_model.MetaToResourceKey(r.GetMeta()), }) }() } return nil } func (c *memoryStore) Update(_ context.Context, r core_model.Resource, fs ...store.UpdateOptionsFunc) error { c.mu.Lock() defer c.mu.Unlock() opts := store.NewUpdateOptions(fs...) meta, ok := (r.GetMeta()).(memoryMeta) if !ok { return fmt.Errorf("MemoryStore.Update() requires r.GetMeta() to be of type memoryMeta") } // Name must be provided via r.GetMeta() mesh := r.GetMeta().GetMesh() _, record := c.findRecord(string(r.Descriptor().Name), r.GetMeta().GetName(), mesh) if record == nil || meta.Version != record.Version { return store.ErrorResourceConflict(r.Descriptor().Name, r.GetMeta().GetName(), r.GetMeta().GetMesh()) } meta.Version = meta.Version.Next() meta.ModificationTime = opts.ModificationTime meta.Labels = opts.Labels r.SetMeta(meta) record.Version = meta.Version record.ModificationTime = meta.ModificationTime record.Labels = meta.Labels content, err := core_model.ToJSON(r.GetSpec()) if err != nil { return err } record.Spec = string(content) if c.eventWriter != nil { go func() { c.eventWriter.Send(events.ResourceChangedEvent{ Operation: events.Update, Type: r.Descriptor().Name, Key: core_model.MetaToResourceKey(r.GetMeta()), }) }() } return nil } func (c *memoryStore) Delete(ctx context.Context, r core_model.Resource, fs ...store.DeleteOptionsFunc) error { c.mu.Lock() defer c.mu.Unlock() return c.delete(r, fs...) } func (c *memoryStore) delete(r core_model.Resource, fs ...store.DeleteOptionsFunc) error { opts := store.NewDeleteOptions(fs...) _, ok := (r.GetMeta()).(memoryMeta) if r.GetMeta() != nil && !ok { return fmt.Errorf("MemoryStore.Delete() requires r.GetMeta() either to be nil or to be of type memoryMeta") } // Name must be provided via DeleteOptions idx, record := c.findRecord(string(r.Descriptor().Name), opts.Name, opts.Mesh) if record == nil { return store.ErrorResourceNotFound(r.Descriptor().Name, opts.Name, opts.Mesh) } for _, child := range record.Children { _, childRecord := c.findRecord(child.ResourceType, child.Name, child.Mesh) if childRecord == nil { continue // resource was already deleted } obj, err := registry.Global().NewObject(core_model.ResourceType(child.ResourceType)) if err != nil { return fmt.Errorf("MemoryStore.Delete() couldn't unmarshal child resource") } if err := c.unmarshalRecord(childRecord, obj); err != nil { return fmt.Errorf("MemoryStore.Delete() couldn't unmarshal child resource") } if err := c.delete(obj, store.DeleteByKey(childRecord.Name, childRecord.Mesh)); err != nil { return fmt.Errorf("MemoryStore.Delete() couldn't delete linked child resource") } } c.records = append(c.records[:idx], c.records[idx+1:]...) if c.eventWriter != nil { go func() { c.eventWriter.Send(events.ResourceChangedEvent{ Operation: events.Delete, Type: r.Descriptor().Name, Key: core_model.ResourceKey{ Mesh: opts.Mesh, Name: opts.Name, }, }) }() } return nil } func (c *memoryStore) Get(_ context.Context, r core_model.Resource, fs ...store.GetOptionsFunc) error { c.mu.RLock() defer c.mu.RUnlock() opts := store.NewGetOptions(fs...) // Name must be provided via GetOptions _, record := c.findRecord(string(r.Descriptor().Name), opts.Name, opts.Mesh) if record == nil { return store.ErrorResourceNotFound(r.Descriptor().Name, opts.Name, opts.Mesh) } if opts.Version != "" && opts.Version != record.Version.String() { return store.ErrorResourceConflict(r.Descriptor().Name, opts.Name, opts.Mesh) } return c.unmarshalRecord(record, r) } func (c *memoryStore) List(_ context.Context, rs core_model.ResourceList, fs ...store.ListOptionsFunc) error { c.mu.RLock() defer c.mu.RUnlock() opts := store.NewListOptions(fs...) records := c.findRecords(string(rs.GetItemType()), opts.Mesh, opts.NameContains) for i := 0; i < len(records); i++ { r := rs.NewItem() if err := c.unmarshalRecord(records[i], r); err != nil { return err } _ = rs.AddItem(r) } rs.GetPagination().SetTotal(uint32(len(records))) return nil } func (c *memoryStore) findRecord( resourceType string, name string, mesh string, ) (int, *memoryStoreRecord) { for idx, rec := range c.records { if rec.ResourceType == resourceType && rec.Name == name && rec.Mesh == mesh { return idx, rec } } return -1, nil } func (c *memoryStore) findRecords(resourceType string, mesh string, contains string) []*memoryStoreRecord { res := make([]*memoryStoreRecord, 0) for _, rec := range c.records { if rec.ResourceType != resourceType { continue } if mesh != "" && rec.Mesh != mesh { continue } if contains != "" && !strings.Contains(rec.Name, contains) { continue } res = append(res, rec) } return res } func (c *memoryStore) marshalRecord(resourceType string, meta memoryMeta, spec core_model.ResourceSpec) (*memoryStoreRecord, error) { // convert spec into storage representation content, err := core_model.ToJSON(spec) if err != nil { return nil, err } return &memoryStoreRecord{ resourceKey: resourceKey{ ResourceType: resourceType, // Name must be provided via CreateOptions Name: meta.Name, Mesh: meta.Mesh, }, Version: meta.Version, Spec: string(content), CreationTime: meta.CreationTime, ModificationTime: meta.ModificationTime, Labels: meta.Labels, }, nil } func (c *memoryStore) unmarshalRecord(s *memoryStoreRecord, r core_model.Resource) error { r.SetMeta(memoryMeta{ Name: s.Name, Mesh: s.Mesh, Version: s.Version, CreationTime: s.CreationTime, ModificationTime: s.ModificationTime, Labels: s.Labels, }) return core_model.FromJSON([]byte(s.Spec), r.GetSpec()) }