common/cluster/metadata.go (143 lines of code) (raw):
// Copyright (c) 2018 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package cluster
import (
"fmt"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
)
type (
// Metadata provides information about clusters
Metadata struct {
log log.Logger
metrics metrics.Scope
// failoverVersionIncrement is the increment of each cluster's version when failover happen
failoverVersionIncrement int64
// primaryClusterName is the name of the primary cluster, only the primary cluster can register / update domain
// all clusters can do domain failover
primaryClusterName string
// currentClusterName is the name of the current cluster
currentClusterName string
// allClusters contains all cluster info
allClusters map[string]config.ClusterInformation
// enabledClusters contains enabled info
enabledClusters map[string]config.ClusterInformation
// remoteClusters contains enabled and remote info
remoteClusters map[string]config.ClusterInformation
// versionToClusterName contains all initial version -> corresponding cluster name
versionToClusterName map[int64]string
// allows for a new failover version migration
useNewFailoverVersionOverride dynamicconfig.BoolPropertyFnWithDomainFilter
}
)
// NewMetadata create a new instance of Metadata
func NewMetadata(
failoverVersionIncrement int64,
primaryClusterName string,
currentClusterName string,
clusterGroup map[string]config.ClusterInformation,
useMinFailoverVersionOverrideConfig dynamicconfig.BoolPropertyFnWithDomainFilter,
metricsClient metrics.Client,
logger log.Logger,
) Metadata {
versionToClusterName := make(map[int64]string)
for clusterName, info := range clusterGroup {
versionToClusterName[info.InitialFailoverVersion] = clusterName
}
// We never use disable clusters, filter them out on start
enabledClusters := map[string]config.ClusterInformation{}
for cluster, info := range clusterGroup {
if info.Enabled {
enabledClusters[cluster] = info
}
}
// Precompute remote clusters, they are used in multiple places
remoteClusters := map[string]config.ClusterInformation{}
for cluster, info := range enabledClusters {
if cluster != currentClusterName {
remoteClusters[cluster] = info
}
}
return Metadata{
log: logger,
metrics: metricsClient.Scope(metrics.ClusterMetadataScope),
failoverVersionIncrement: failoverVersionIncrement,
primaryClusterName: primaryClusterName,
currentClusterName: currentClusterName,
allClusters: clusterGroup,
enabledClusters: enabledClusters,
remoteClusters: remoteClusters,
versionToClusterName: versionToClusterName,
useNewFailoverVersionOverride: useMinFailoverVersionOverrideConfig,
}
}
// GetNextFailoverVersion return the next failover version based on input
func (m Metadata) GetNextFailoverVersion(cluster string, currentFailoverVersion int64, domainName string) int64 {
initialFailoverVersion := m.getInitialFailoverVersion(cluster, domainName)
failoverVersion := currentFailoverVersion/m.failoverVersionIncrement*m.failoverVersionIncrement + initialFailoverVersion
if failoverVersion < currentFailoverVersion {
return failoverVersion + m.failoverVersionIncrement
}
return failoverVersion
}
// IsVersionFromSameCluster return true if the new version is used for the same cluster
func (m Metadata) IsVersionFromSameCluster(version1 int64, version2 int64) bool {
v1Server, err := m.resolveServerName(version1)
if err != nil {
// preserving old behaviour however, this should never occur
m.metrics.IncCounter(metrics.ClusterMetadataFailureToResolveCounter)
m.log.Error("could not resolve an incoming version", tag.Dynamic("failover-version", version1))
return false
}
v2Server, err := m.resolveServerName(version2)
if err != nil {
m.log.Error("could not resolve an incoming version", tag.Dynamic("failover-version", version2))
return false
}
return v1Server == v2Server
}
func (m Metadata) IsPrimaryCluster() bool {
return m.primaryClusterName == m.currentClusterName
}
// GetCurrentClusterName return the current cluster name
func (m Metadata) GetCurrentClusterName() string {
return m.currentClusterName
}
// GetAllClusterInfo return all cluster info
func (m Metadata) GetAllClusterInfo() map[string]config.ClusterInformation {
return m.allClusters
}
// GetEnabledClusterInfo return enabled cluster info
func (m Metadata) GetEnabledClusterInfo() map[string]config.ClusterInformation {
return m.enabledClusters
}
// GetRemoteClusterInfo return enabled AND remote cluster info
func (m Metadata) GetRemoteClusterInfo() map[string]config.ClusterInformation {
return m.remoteClusters
}
// ClusterNameForFailoverVersion return the corresponding cluster name for a given failover version
func (m Metadata) ClusterNameForFailoverVersion(failoverVersion int64) (string, error) {
if failoverVersion == common.EmptyVersion {
return m.currentClusterName, nil
}
server, err := m.resolveServerName(failoverVersion)
if err != nil {
m.metrics.IncCounter(metrics.ClusterMetadataResolvingFailoverVersionCounter)
return "", fmt.Errorf("failed to resolve failover version: %v", err)
}
return server, nil
}
// gets the initial failover version for a cluster / domain
// along with some helpers for a migration - should it be necessary
func (m Metadata) getInitialFailoverVersion(cluster string, domainName string) int64 {
info, ok := m.allClusters[cluster]
if !ok {
panic(fmt.Sprintf(
"Unknown cluster name: %v with given cluster initial failover version map: %v.",
cluster,
m.allClusters,
))
}
// if using the minFailover Version during a cluster config, then return this from config
// (assuming it's safe to do so). This is not the normal state of things and intended only
// for when migrating versions.
usingNewFailoverVersion := m.useNewFailoverVersionOverride(domainName)
if usingNewFailoverVersion && info.NewInitialFailoverVersion != nil {
m.log.Debug("using new failover version for cluster", tag.ClusterName(cluster), tag.WorkflowDomainName(domainName))
m.metrics.IncCounter(metrics.ClusterMetadataGettingMinFailoverVersionCounter)
return *info.NewInitialFailoverVersion
}
// default behaviour - return the initial failover version - a marker to
// identify the cluster for all counters
m.log.Debug("getting failover version for cluster", tag.ClusterName(cluster), tag.WorkflowDomainName(domainName))
m.metrics.IncCounter(metrics.ClusterMetadataGettingFailoverVersionCounter)
return info.InitialFailoverVersion
}
// resolves the server name from a version number. Better to use this
// than to check versionToClusterName directly, as this also falls back to catch
// when there's a migration NewInitialFailoverVersion
func (m Metadata) resolveServerName(version int64) (string, error) {
moddedFoVersion := version % m.failoverVersionIncrement
// attempt a lookup first
server, ok := m.versionToClusterName[moddedFoVersion]
if ok {
return server, nil
}
// else fall back on checking for new failover versions
for name, cluster := range m.allClusters {
if cluster.NewInitialFailoverVersion != nil && *cluster.NewInitialFailoverVersion == moddedFoVersion {
return name, nil
}
}
m.metrics.IncCounter(metrics.ClusterMetadataFailureToResolveCounter)
return "", fmt.Errorf("could not resolve failover version: %d", version)
}