datasource/etcd/state/state_manager.go (129 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Package state provides a State to manage the implementations of sd package, see types.go
package state
import (
"context"
"fmt"
"sync"
"time"
"github.com/apache/servicecomb-service-center/datasource/etcd/state/kvstore"
"github.com/apache/servicecomb-service-center/pkg/goutil"
"github.com/apache/servicecomb-service-center/pkg/log"
"github.com/apache/servicecomb-service-center/pkg/util"
"github.com/apache/servicecomb-service-center/server/config"
"github.com/go-chassis/foundation/gopool"
)
type Manager struct {
Repository Repository
Rev int64
states map[kvstore.Type]State
statesLock sync.RWMutex
goroutine *gopool.Pool
ready chan struct{}
isClose bool
}
func (s *Manager) Initialize() {
s.states = make(map[kvstore.Type]State)
s.ready = make(chan struct{})
s.goroutine = goutil.New()
}
func (s *Manager) OnCacheEvent(evt kvstore.Event) {
if s.Rev < evt.Revision {
s.Rev = evt.Revision
}
}
func (s *Manager) InjectConfig(cfg *kvstore.Options) *kvstore.Options {
if !Configuration().EnableCache {
cfg.WithInitSize(0)
}
cfg.AppendEventFunc(s.OnCacheEvent)
return cfg
}
func (s *Manager) repo() Repository {
return s.Repository
}
func (s *Manager) getOrCreateState(t kvstore.Type) State {
s.statesLock.RLock()
v, ok := s.states[t]
if ok {
s.statesLock.RUnlock()
return v
}
s.statesLock.RUnlock()
s.statesLock.Lock()
p, ok := Plugins()[t]
if ok {
cfg := p.Config()
kvstore.EventProxy(t).InjectConfig(cfg)
s.InjectConfig(cfg)
state := s.repo().New(t, cfg)
state.Run()
s.states[t] = state
s.statesLock.Unlock()
return state
}
s.statesLock.Unlock()
log.Warn(fmt.Sprintf("type '%s' not found", t))
return nil
}
func (s *Manager) stopStates() {
s.statesLock.RLock()
for _, state := range s.states {
state.Stop()
}
s.statesLock.RUnlock()
}
func (s *Manager) Run() {
s.goroutine.Do(s.store)
s.goroutine.Do(s.autoClearCache)
}
func (s *Manager) store(ctx context.Context) {
// new all types
for _, t := range kvstore.Types {
state := s.getOrCreateState(t)
if state == nil {
continue
}
select {
case <-ctx.Done():
return
case <-state.Ready():
}
}
util.SafeCloseChan(s.ready)
log.Debug("all states are ready")
}
func (s *Manager) autoClearCache(ctx context.Context) {
ttl := config.GetRegistry().CacheTTL
if ttl == 0 {
return
}
log.Info(fmt.Sprintf("start auto clear cache in %v", ttl))
for {
select {
case <-ctx.Done():
return
case <-time.After(ttl):
for _, t := range kvstore.Types {
cache, ok := s.getOrCreateState(t).Cache().(kvstore.Cache)
if !ok {
log.Error("the discovery adaptor does not implement the Cache", nil)
continue
}
cache.MarkDirty()
}
log.Warn("caches are marked dirty!")
}
}
}
func (s *Manager) Stop() {
if s.isClose {
return
}
s.isClose = true
s.stopStates()
s.goroutine.Close(true)
util.SafeCloseChan(s.ready)
log.Debug("store daemon stopped")
}
func (s *Manager) Ready() <-chan struct{} {
return s.ready
}
func (s *Manager) States(id kvstore.Type) State { return s.getOrCreateState(id) }