cluster/topology/dynamic.go (211 lines of code) (raw):

// Copyright (c) 2016 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package topology import ( "errors" "github.com/m3db/m3/src/cluster/kv" "github.com/m3db/m3/src/cluster/placement" "github.com/m3db/m3/src/cluster/services" "github.com/m3db/m3/src/cluster/shard" xwatch "github.com/m3db/m3/src/x/watch" aresShard "github.com/uber/aresdb/cluster/shard" "github.com/uber/aresdb/common" "github.com/uber/aresdb/utils" "sync" ) var ( errInvalidService = errors.New("service topology is invalid") errUnexpectedShard = errors.New("shard is unexpected") errMissingShard = errors.New("shard is missing") errNotEnoughReplicasForShard = errors.New("replicas of shard is less than expected") errInvalidTopology = errors.New("could not parse latest value from config service") ) type dynamicInitializer struct { sync.Mutex opts DynamicOptions topo Topology } // NewDynamicInitializer returns a dynamic topology initializer func NewDynamicInitializer(opts DynamicOptions) Initializer { return &dynamicInitializer{opts: opts} } func (i *dynamicInitializer) Init() (Topology, error) { i.Lock() defer i.Unlock() if i.topo != nil { return i.topo, nil } topo, err := newDynamicTopology(i.opts) if err != nil { return nil, err } i.topo = topo return i.topo, nil } func (i *dynamicInitializer) TopologyIsSet() (bool, error) { services, err := i.opts.ConfigServiceClient().Services(i.opts.ServicesOverrideOptions()) if err != nil { return false, err } _, err = services.Query(i.opts.ServiceID(), i.opts.QueryOptions()) if err != nil { if err == kv.ErrNotFound { // Valid, just means topology is not set return false, nil } return false, err } return true, nil } type dynamicTopology struct { sync.RWMutex opts DynamicOptions services services.Services watch services.Watch watchable xwatch.Watchable closed bool logger common.Logger } func newDynamicTopology(opts DynamicOptions) (DynamicTopology, error) { services, err := opts.ConfigServiceClient().Services(nil) if err != nil { return nil, err } logger := utils.GetLogger() logger.Info("waiting for dynamic topology initialization, " + "if this takes a long time, make sure that a topology/placement is configured") watch, err := services.Watch(opts.ServiceID(), opts.QueryOptions()) if err != nil { return nil, err } <-watch.C() logger.Info("initial topology / placement value received") m, err := getMapFromUpdate(watch.Get(), opts.QueryOptions().IncludeUnhealthy()) if err != nil { logger.With("err", err).Error("dynamic topology received invalid initial value") return nil, err } watchable := xwatch.NewWatchable() watchable.Update(m) dt := &dynamicTopology{ opts: opts, services: services, watch: watch, watchable: watchable, logger: logger, } go dt.run() return dt, nil } func (t *dynamicTopology) isClosed() bool { t.RLock() closed := t.closed t.RUnlock() return closed } func (t *dynamicTopology) run() { for !t.isClosed() { if _, ok := <-t.watch.C(); !ok { t.Close() break } m, err := getMapFromUpdate(t.watch.Get(), t.opts.QueryOptions().IncludeUnhealthy()) if err != nil { t.logger.With("err", err).Warn("dynamic topology received invalid update") continue } t.watchable.Update(m) } } func (t *dynamicTopology) Get() Map { return t.watchable.Get().(Map) } func (t *dynamicTopology) Watch() (MapWatch, error) { _, w, err := t.watchable.Watch() if err != nil { return nil, err } return NewMapWatch(w), err } func (t *dynamicTopology) Close() { t.Lock() defer t.Unlock() if t.closed { return } t.closed = true t.watch.Close() t.watchable.Close() } func (t *dynamicTopology) MarkShardsAvailable( instanceID string, shardIDs ...uint32, ) error { opts := placement.NewOptions() ps, err := t.services.PlacementService(t.opts.ServiceID(), opts) if err != nil { return err } _, err = ps.MarkShardsAvailable(instanceID, shardIDs...) return err } func getMapFromUpdate(service services.Service, unhealthyIncluded bool) (Map, error) { to, err := getStaticOptions(service, unhealthyIncluded) if err != nil { return nil, err } return NewStaticMap(to), nil } func getStaticOptions(service services.Service, unhealthyIncluded bool) (StaticOptions, error) { if service == nil || service.Replication() == nil || service.Sharding() == nil || service.Instances() == nil { return nil, errInvalidService } replicas := service.Replication().Replicas() instances := service.Instances() numShards := service.Sharding().NumShards() allShardIDs, err := validateInstances(instances, unhealthyIncluded, replicas, numShards) if err != nil { return nil, err } allShards := make([]shard.Shard, len(allShardIDs)) for i, id := range allShardIDs { allShards[i] = shard.NewShard(uint32(id)).SetState(shard.Available) } allShardSet := aresShard.NewShardSet(allShards) hostShardSets := make([]HostShardSet, len(instances)) for i, instance := range instances { hs, err := NewHostShardSetFromServiceInstance(instance) if err != nil { return nil, err } hostShardSets[i] = hs } return NewStaticOptions(). SetReplicas(replicas). SetShardSet(allShardSet). SetHostShardSets(hostShardSets), nil } func validateInstances(instances []services.ServiceInstance, unhealthyIncluded bool, replicas, numShards int) ([]uint32, error) { m := make(map[uint32]int) for _, i := range instances { if i.Shards() == nil { return nil, errInstanceHasNoShardsAssignment } for _, s := range i.Shards().All() { m[s.ID()] = m[s.ID()] + 1 } } s := make([]uint32, numShards) for i := range s { expectShard := uint32(i) count, exist := m[expectShard] if !exist { return nil, errMissingShard } if unhealthyIncluded && count < replicas { return nil, errNotEnoughReplicasForShard } delete(m, expectShard) s[i] = expectShard } if len(m) > 0 { return nil, errUnexpectedShard } return s, nil }