store/store.go (268 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ package store import ( "context" "encoding/json" "fmt" "sync" "github.com/apache/kvrocks-controller/consts" "github.com/apache/kvrocks-controller/store/engine" ) type Store interface { IsReady(ctx context.Context) bool ListNamespace(ctx context.Context) ([]string, error) CreateNamespace(ctx context.Context, ns string) error ExistsNamespace(ctx context.Context, ns string) (bool, error) RemoveNamespace(ctx context.Context, ns string) error ListCluster(ctx context.Context, ns string) ([]string, error) GetCluster(ctx context.Context, ns, cluster string) (*Cluster, error) RemoveCluster(ctx context.Context, ns, cluster string) error CreateCluster(ctx context.Context, ns string, cluster *Cluster) error UpdateCluster(ctx context.Context, ns string, cluster *Cluster) error SetCluster(ctx context.Context, ns string, clusterInfo *Cluster) error CheckNewNodes(ctx context.Context, nodes []string) error } var _ Store = (*ClusterStore)(nil) type ClusterStore struct { e engine.Engine locks sync.Map eventNotifyCh chan EventPayload quitCh chan struct{} } func NewClusterStore(e engine.Engine) *ClusterStore { return &ClusterStore{ e: e, eventNotifyCh: make(chan EventPayload, 100), quitCh: make(chan struct{}), } } func (s *ClusterStore) IsReady(ctx context.Context) bool { return s.e.IsReady(ctx) } // ListNamespace return the list of name of all namespaces func (s *ClusterStore) ListNamespace(ctx context.Context) ([]string, error) { entries, err := s.e.List(ctx, nsPrefix) if err != nil { return nil, err } keys := make([]string, len(entries)) for i, entry := range entries { keys[i] = entry.Key } return keys, nil } // ExistsNamespace return an indicator whether the specified namespace exists func (s *ClusterStore) ExistsNamespace(ctx context.Context, ns string) (bool, error) { return s.e.Exists(ctx, appendPrefix(ns)) } // CreateNamespace will create a namespace for clusters func (s *ClusterStore) CreateNamespace(ctx context.Context, ns string) error { if has, _ := s.ExistsNamespace(ctx, ns); has { return consts.ErrAlreadyExists } if err := s.e.Set(ctx, appendPrefix(ns), []byte(ns)); err != nil { return err } s.EmitEvent(EventPayload{ Namespace: ns, Type: EventNamespace, Command: CommandCreate, }) return nil } // RemoveNamespace delete the specified namespace from store func (s *ClusterStore) RemoveNamespace(ctx context.Context, ns string) error { if has, _ := s.ExistsNamespace(ctx, ns); !has { return consts.ErrNotFound } clusters, err := s.ListCluster(ctx, ns) if err != nil { return err } if len(clusters) != 0 { return fmt.Errorf("%w: please delete clusters first", consts.ErrForbidden) } if err := s.e.Delete(ctx, appendPrefix(ns)); err != nil { return err } s.EmitEvent(EventPayload{ Namespace: ns, Type: EventNamespace, Command: CommandRemove, }) return nil } func (s *ClusterStore) getLock(ns, cluster string) *sync.RWMutex { value, _ := s.locks.LoadOrStore(fmt.Sprintf("%s/%s", ns, cluster), &sync.RWMutex{}) lock, _ := value.(*sync.RWMutex) return lock } // ListCluster return the list of name of cluster under the specified namespace func (s *ClusterStore) ListCluster(ctx context.Context, ns string) ([]string, error) { entries, err := s.e.List(ctx, buildClusterPrefix(ns)) if err != nil { return nil, err } keys := make([]string, len(entries)) for i, entry := range entries { keys[i] = entry.Key } return keys, nil } func (s *ClusterStore) existsCluster(ctx context.Context, ns, cluster string) (bool, error) { return s.e.Exists(ctx, buildClusterKey(ns, cluster)) } func (s *ClusterStore) GetCluster(ctx context.Context, ns, cluster string) (*Cluster, error) { lock := s.getLock(ns, cluster) lock.RLock() defer lock.RUnlock() return s.getClusterWithoutLock(ctx, ns, cluster) } func (s *ClusterStore) getClusterWithoutLock(ctx context.Context, ns, cluster string) (*Cluster, error) { value, err := s.e.Get(ctx, buildClusterKey(ns, cluster)) if err != nil { return nil, fmt.Errorf("cluster: %w", err) } var clusterInfo Cluster if err = json.Unmarshal(value, &clusterInfo); err != nil { return nil, fmt.Errorf("cluster: %w", err) } return &clusterInfo, nil } // UpdateCluster update the Name to store under the specified namespace func (s *ClusterStore) UpdateCluster(ctx context.Context, ns string, clusterInfo *Cluster) error { lock := s.getLock(ns, clusterInfo.Name) lock.Lock() defer lock.Unlock() oldCluster, err := s.getClusterWithoutLock(ctx, ns, clusterInfo.Name) if err != nil { return err } if oldCluster.Version.Load() > clusterInfo.Version.Load() { return fmt.Errorf("the cluster has been updated by others") } clusterInfo.Version.Add(1) clusterBytes, err := json.Marshal(clusterInfo) if err != nil { return fmt.Errorf("cluster: %w", err) } if err := s.e.Set(ctx, buildClusterKey(ns, clusterInfo.Name), clusterBytes); err != nil { return err } s.EmitEvent(EventPayload{ Namespace: ns, Cluster: clusterInfo.Name, Type: EventCluster, Command: CommandUpdate, }) return nil } // SetCluster set the cluster to store under the specified namespace but won't increase the version. func (s *ClusterStore) SetCluster(ctx context.Context, ns string, clusterInfo *Cluster) error { lock := s.getLock(ns, clusterInfo.Name) lock.Lock() defer lock.Unlock() oldCluster, err := s.getClusterWithoutLock(ctx, ns, clusterInfo.Name) if err != nil { return err } if oldCluster.Version.Load() > clusterInfo.Version.Load() { return fmt.Errorf("the cluster has been updated by others") } value, err := json.Marshal(clusterInfo) if err != nil { return fmt.Errorf("cluster: %w", err) } return s.e.Set(ctx, buildClusterKey(ns, clusterInfo.Name), value) } func (s *ClusterStore) CreateCluster(ctx context.Context, ns string, clusterInfo *Cluster) error { lock := s.getLock(ns, clusterInfo.Name) lock.Lock() defer lock.Unlock() if exists, _ := s.existsCluster(ctx, ns, clusterInfo.Name); exists { return fmt.Errorf("cluster: %w", consts.ErrAlreadyExists) } clusterBytes, err := json.Marshal(clusterInfo) if err != nil { return fmt.Errorf("cluster: %w", err) } if err := s.e.Set(ctx, buildClusterKey(ns, clusterInfo.Name), clusterBytes); err != nil { return err } s.EmitEvent(EventPayload{ Namespace: ns, Cluster: clusterInfo.Name, Type: EventCluster, Command: CommandCreate, }) return nil } func (s *ClusterStore) RemoveCluster(ctx context.Context, ns, cluster string) error { lock := s.getLock(ns, cluster) lock.Lock() defer lock.Unlock() if exists, _ := s.existsCluster(ctx, ns, cluster); !exists { return consts.ErrNotFound } if err := s.e.Delete(ctx, buildClusterKey(ns, cluster)); err != nil { return err } s.EmitEvent(EventPayload{ Namespace: ns, Cluster: cluster, Type: EventCluster, Command: CommandRemove, }) return nil } func (s *ClusterStore) CheckNewNodes(ctx context.Context, nodes []string) error { newNodes := make(map[string]bool, 0) for _, node := range nodes { newNodes[node] = true } namespaces, err := s.ListNamespace(ctx) if err != nil { return err } existingNodes := make([]string, 0) for _, ns := range namespaces { clusters, err := s.ListCluster(ctx, ns) if err != nil { return err } for _, cluster := range clusters { c, err := s.GetCluster(ctx, ns, cluster) if err != nil { return err } for _, existingNode := range c.GetNodes() { if _, ok := newNodes[existingNode.Addr()]; ok { existingNodes = append(existingNodes, existingNode.Addr()) } } } } if len(existingNodes) > 0 { return fmt.Errorf("node: %w: %v", consts.ErrAlreadyExists, existingNodes) } return nil } func (s *ClusterStore) Notify() <-chan EventPayload { return s.eventNotifyCh } func (s *ClusterStore) EmitEvent(event EventPayload) { s.eventNotifyCh <- event } func (s *ClusterStore) GetEngine() engine.Engine { return s.e } func (s *ClusterStore) LeaderChange() <-chan bool { return s.e.LeaderChange() } func (s *ClusterStore) IsLeader() bool { return s.e.Leader() == s.e.ID() } func (s *ClusterStore) Leader() string { return s.e.Leader() } func (s *ClusterStore) ID() string { return s.e.ID() } func (s *ClusterStore) Close() error { return s.e.Close() } func (s *ClusterStore) Stop() error { return nil }