cluster/topology/options.go (134 lines of code) (raw):

// Copyright (c) 2016 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package topology import ( "errors" "fmt" "github.com/m3db/m3/src/cluster/client" "github.com/m3db/m3/src/cluster/services" "github.com/uber/aresdb/cluster/shard" "github.com/uber/aresdb/utils" "time" ) const ( defaultServiceName = "aresDB" defaultInitTimeout = 0 // Wait indefinitely by default for topology defaultReplicas = 3 ) var ( errNoConfigServiceClient = errors.New("no config service client") errInvalidReplicas = errors.New("replicas must be equal to or greater than 1") ) // staticOptions is the implementation of the interface StaticOptions type staticOptions struct { shardSet shard.ShardSet hostShardSets []HostShardSet replicas int } // NewStaticOptions creates a new set of static topology options func NewStaticOptions() StaticOptions { return &staticOptions{ replicas: defaultReplicas, } } func (o *staticOptions) SetShardSet(value shard.ShardSet) StaticOptions { opts := *o opts.shardSet = value return &opts } func (o *staticOptions) ShardSet() shard.ShardSet { return o.shardSet } func (o *staticOptions) SetHostShardSets(value []HostShardSet) StaticOptions { opts := *o opts.hostShardSets = value return &opts } func (o *staticOptions) HostShardSets() []HostShardSet { return o.hostShardSets } func (o *staticOptions) SetReplicas(value int) StaticOptions { opts := *o opts.replicas = value return &opts } func (o *staticOptions) Replicas() int { return o.replicas } // dynamicOptions is the implementation of the interface DynamicOptions type dynamicOptions struct { configServiceClient client.Client serviceID services.ServiceID servicesOverrideOptions services.OverrideOptions queryOptions services.QueryOptions instrumentOptions utils.Options initTimeout time.Duration } // NewDynamicOptions creates a new set of dynamic topology options func NewDynamicOptions() DynamicOptions { return &dynamicOptions{ serviceID: services.NewServiceID().SetName(defaultServiceName), servicesOverrideOptions: services.NewOverrideOptions(), queryOptions: services.NewQueryOptions(), instrumentOptions: utils.NewOptions(), initTimeout: defaultInitTimeout, } } func (o *staticOptions) Validate() error { if o.replicas < 1 { return errInvalidReplicas } // Make a mapping of each shard to a set of hosts and check each // shard has at least the required replicas mapped to // NB(r): We allow greater than the required replicas in case // node is streaming in and needs to take writes totalShards := len(o.shardSet.AllIDs()) hostAddressesByShard := make([]map[string]struct{}, totalShards) for i := range hostAddressesByShard { hostAddressesByShard[i] = make(map[string]struct{}, o.replicas) } for _, hostShardSet := range o.hostShardSets { hostAddress := hostShardSet.Host().Address() for _, shard := range hostShardSet.ShardSet().AllIDs() { hostAddressesByShard[shard][hostAddress] = struct{}{} } } for shard, hosts := range hostAddressesByShard { if len(hosts) < o.replicas { errorFmt := "shard %d has %d replicas, less than the required %d replicas" return fmt.Errorf(errorFmt, shard, len(hosts), o.replicas) } } return nil } func (o *dynamicOptions) SetConfigServiceClient(c client.Client) DynamicOptions { o.configServiceClient = c return o } func (o *dynamicOptions) ConfigServiceClient() client.Client { return o.configServiceClient } func (o *dynamicOptions) SetServiceID(s services.ServiceID) DynamicOptions { o.serviceID = s return o } func (o *dynamicOptions) ServiceID() services.ServiceID { return o.serviceID } func (o *dynamicOptions) SetServicesOverrideOptions(opts services.OverrideOptions) DynamicOptions { o.servicesOverrideOptions = opts return o } func (o *dynamicOptions) ServicesOverrideOptions() services.OverrideOptions { return o.servicesOverrideOptions } func (o *dynamicOptions) SetQueryOptions(qo services.QueryOptions) DynamicOptions { o.queryOptions = qo return o } func (o *dynamicOptions) QueryOptions() services.QueryOptions { return o.queryOptions } func (o *dynamicOptions) SetInstrumentOptions(io utils.Options) DynamicOptions { o.instrumentOptions = io return o } func (o *dynamicOptions) InstrumentOptions() utils.Options { return o.instrumentOptions } func (o *dynamicOptions) Validate() error { if o.ConfigServiceClient() == nil { return errNoConfigServiceClient } return nil }