cluster/topology/healthtracking_dynamic.go (90 lines of code) (raw):

// Copyright (c) 2016 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package topology import ( "github.com/uber/aresdb/utils" "sync" "time" ) const ( unhealthyRetryPeriodSeconds = 10 ) type healthiness struct { healthy bool lastUpdateTimestamp time.Time } type healthTrackingDynamicTopoImpl struct { sync.RWMutex dynamicTopology Topology hostsHealthiness map[Host]*healthiness closed bool } func NewHealthTrackingDynamicTopology(opts DynamicOptions) (HealthTrackingDynamicTopoloy, error) { dynamicTopo, err := NewDynamicInitializer(opts).Init() if err != nil { return nil, err } topo := &healthTrackingDynamicTopoImpl{ dynamicTopology: dynamicTopo, hostsHealthiness: make(map[Host]*healthiness), } return topo, nil } func (ht *healthTrackingDynamicTopoImpl) Get() Map { ht.Lock() defer ht.Unlock() // filter known unhealthy hosts from dynamic topology dm := ht.dynamicTopology.Get() dhss := dm.HostShardSets() var hostShardSets []HostShardSet for _, hss := range dhss { h, found := ht.hostsHealthiness[hss.Host()] if !found { newHealthiness := &healthiness{ healthy: true, lastUpdateTimestamp: utils.Now(), } ht.hostsHealthiness[hss.Host()] = newHealthiness h = newHealthiness } if h.healthy || utils.Now().Sub(h.lastUpdateTimestamp).Seconds() > unhealthyRetryPeriodSeconds { hostShardSets = append(hostShardSets, hss) } } return NewStaticMap(NewStaticOptions(). SetShardSet(dm.ShardSet()). SetReplicas(dm.Replicas()). SetHostShardSets(hostShardSets)) } func (ht *healthTrackingDynamicTopoImpl) MarkHostHealthy(host Host) error { return ht.changeHostHealthState(host, true) } func (ht *healthTrackingDynamicTopoImpl) MarkHostUnhealthy(host Host) error { return ht.changeHostHealthState(host, false) } func (ht *healthTrackingDynamicTopoImpl) changeHostHealthState(host Host, healthy bool) error { ht.Lock() defer ht.Unlock() h, found := ht.hostsHealthiness[host] if !found { return utils.StackError(nil, "failed to change host health state, host not found. host: %s, healthiness %t", host, healthy) } h.healthy = healthy h.lastUpdateTimestamp = utils.Now() return nil } // dummy implementation, don't use // TODO: implement when needed func (ht *healthTrackingDynamicTopoImpl) Watch() (MapWatch, error) { return nil, nil } func (ht *healthTrackingDynamicTopoImpl) isClosed() bool { ht.RLock() closed := ht.closed ht.RUnlock() return closed } func (ht *healthTrackingDynamicTopoImpl) Close() { ht.Lock() defer ht.Unlock() if ht.closed { return } ht.closed = true ht.dynamicTopology.Close() }