common/domain/handler.go (960 lines of code) (raw):
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination handler_mock.go
package domain
import (
"context"
"fmt"
"regexp"
"time"
"github.com/pborman/uuid"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/archiver"
"github.com/uber/cadence/common/archiver/provider"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/service"
"github.com/uber/cadence/common/types"
)
var (
errDomainUpdateTooFrequent = &types.ServiceBusyError{Message: "Domain update too frequent."}
errInvalidDomainName = &types.BadRequestError{Message: "Domain name can only include alphanumeric and dash characters."}
)
type (
// Handler is the domain operation handler
Handler interface {
DeprecateDomain(
ctx context.Context,
deprecateRequest *types.DeprecateDomainRequest,
) error
DescribeDomain(
ctx context.Context,
describeRequest *types.DescribeDomainRequest,
) (*types.DescribeDomainResponse, error)
ListDomains(
ctx context.Context,
listRequest *types.ListDomainsRequest,
) (*types.ListDomainsResponse, error)
RegisterDomain(
ctx context.Context,
registerRequest *types.RegisterDomainRequest,
) error
UpdateDomain(
ctx context.Context,
updateRequest *types.UpdateDomainRequest,
) (*types.UpdateDomainResponse, error)
UpdateIsolationGroups(
ctx context.Context,
updateRequest types.UpdateDomainIsolationGroupsRequest,
) error
UpdateAsyncWorkflowConfiguraton(
ctx context.Context,
updateRequest types.UpdateDomainAsyncWorkflowConfiguratonRequest,
) error
}
// handlerImpl is the domain operation handler implementation
handlerImpl struct {
domainManager persistence.DomainManager
clusterMetadata cluster.Metadata
domainReplicator Replicator
domainAttrValidator *AttrValidatorImpl
archivalMetadata archiver.ArchivalMetadata
archiverProvider provider.ArchiverProvider
timeSource clock.TimeSource
config Config
logger log.Logger
}
// Config is the domain config for domain handler
Config struct {
MinRetentionDays dynamicconfig.IntPropertyFn
MaxRetentionDays dynamicconfig.IntPropertyFn
RequiredDomainDataKeys dynamicconfig.MapPropertyFn
MaxBadBinaryCount dynamicconfig.IntPropertyFnWithDomainFilter
FailoverCoolDown dynamicconfig.DurationPropertyFnWithDomainFilter
}
)
var _ Handler = (*handlerImpl)(nil)
// NewHandler create a new domain handler
func NewHandler(
config Config,
logger log.Logger,
domainManager persistence.DomainManager,
clusterMetadata cluster.Metadata,
domainReplicator Replicator,
archivalMetadata archiver.ArchivalMetadata,
archiverProvider provider.ArchiverProvider,
timeSource clock.TimeSource,
) Handler {
return &handlerImpl{
logger: logger,
domainManager: domainManager,
clusterMetadata: clusterMetadata,
domainReplicator: domainReplicator,
domainAttrValidator: newAttrValidator(clusterMetadata, int32(config.MinRetentionDays())),
archivalMetadata: archivalMetadata,
archiverProvider: archiverProvider,
timeSource: timeSource,
config: config,
}
}
// RegisterDomain register a new domain
func (d *handlerImpl) RegisterDomain(
ctx context.Context,
registerRequest *types.RegisterDomainRequest,
) error {
// cluster global domain enabled
if !d.clusterMetadata.IsPrimaryCluster() && registerRequest.GetIsGlobalDomain() {
return errNotPrimaryCluster
}
// first check if the name is already registered as the local domain
_, err := d.domainManager.GetDomain(ctx, &persistence.GetDomainRequest{Name: registerRequest.GetName()})
switch err.(type) {
case nil:
// domain already exists, cannot proceed
return &types.DomainAlreadyExistsError{Message: "Domain already exists."}
case *types.EntityNotExistsError:
// domain does not exists, proceeds
default:
// other err
return err
}
// input validation on domain name
matchedRegex, err := regexp.MatchString("^[a-zA-Z0-9-]+$", registerRequest.GetName())
if err != nil {
return err
}
if !matchedRegex {
return errInvalidDomainName
}
activeClusterName := d.clusterMetadata.GetCurrentClusterName()
// input validation on cluster names
if registerRequest.ActiveClusterName != "" {
activeClusterName = registerRequest.GetActiveClusterName()
}
clusters := []*persistence.ClusterReplicationConfig{}
for _, clusterConfig := range registerRequest.Clusters {
clusterName := clusterConfig.GetClusterName()
clusters = append(clusters, &persistence.ClusterReplicationConfig{ClusterName: clusterName})
}
clusters = cluster.GetOrUseDefaultClusters(activeClusterName, clusters)
currentHistoryArchivalState := neverEnabledState()
nextHistoryArchivalState := currentHistoryArchivalState
clusterHistoryArchivalConfig := d.archivalMetadata.GetHistoryConfig()
if clusterHistoryArchivalConfig.ClusterConfiguredForArchival() {
archivalEvent, err := d.toArchivalRegisterEvent(
registerRequest.HistoryArchivalStatus,
registerRequest.GetHistoryArchivalURI(),
clusterHistoryArchivalConfig.GetDomainDefaultStatus(),
clusterHistoryArchivalConfig.GetDomainDefaultURI(),
)
if err != nil {
return err
}
nextHistoryArchivalState, _, err = currentHistoryArchivalState.getNextState(archivalEvent, d.validateHistoryArchivalURI)
if err != nil {
return err
}
}
currentVisibilityArchivalState := neverEnabledState()
nextVisibilityArchivalState := currentVisibilityArchivalState
clusterVisibilityArchivalConfig := d.archivalMetadata.GetVisibilityConfig()
if clusterVisibilityArchivalConfig.ClusterConfiguredForArchival() {
archivalEvent, err := d.toArchivalRegisterEvent(
registerRequest.VisibilityArchivalStatus,
registerRequest.GetVisibilityArchivalURI(),
clusterVisibilityArchivalConfig.GetDomainDefaultStatus(),
clusterVisibilityArchivalConfig.GetDomainDefaultURI(),
)
if err != nil {
return err
}
nextVisibilityArchivalState, _, err = currentVisibilityArchivalState.getNextState(archivalEvent, d.validateVisibilityArchivalURI)
if err != nil {
return err
}
}
info := &persistence.DomainInfo{
ID: uuid.New(),
Name: registerRequest.GetName(),
Status: persistence.DomainStatusRegistered,
OwnerEmail: registerRequest.GetOwnerEmail(),
Description: registerRequest.GetDescription(),
Data: registerRequest.Data,
}
config := &persistence.DomainConfig{
Retention: registerRequest.GetWorkflowExecutionRetentionPeriodInDays(),
EmitMetric: registerRequest.GetEmitMetric(),
HistoryArchivalStatus: nextHistoryArchivalState.Status,
HistoryArchivalURI: nextHistoryArchivalState.URI,
VisibilityArchivalStatus: nextVisibilityArchivalState.Status,
VisibilityArchivalURI: nextVisibilityArchivalState.URI,
BadBinaries: types.BadBinaries{Binaries: map[string]*types.BadBinaryInfo{}},
}
replicationConfig := &persistence.DomainReplicationConfig{
ActiveClusterName: activeClusterName,
Clusters: clusters,
}
isGlobalDomain := registerRequest.GetIsGlobalDomain()
if err := d.domainAttrValidator.validateDomainConfig(config); err != nil {
return err
}
if isGlobalDomain {
if err := d.domainAttrValidator.validateDomainReplicationConfigForGlobalDomain(
replicationConfig,
); err != nil {
return err
}
} else {
if err := d.domainAttrValidator.validateDomainReplicationConfigForLocalDomain(
replicationConfig,
); err != nil {
return err
}
}
failoverVersion := common.EmptyVersion
if registerRequest.GetIsGlobalDomain() {
failoverVersion = d.clusterMetadata.GetNextFailoverVersion(activeClusterName, 0, registerRequest.Name)
}
domainRequest := &persistence.CreateDomainRequest{
Info: info,
Config: config,
ReplicationConfig: replicationConfig,
IsGlobalDomain: isGlobalDomain,
ConfigVersion: 0,
FailoverVersion: failoverVersion,
LastUpdatedTime: d.timeSource.Now().UnixNano(),
}
domainResponse, err := d.domainManager.CreateDomain(ctx, domainRequest)
if err != nil {
return err
}
if domainRequest.IsGlobalDomain {
err = d.domainReplicator.HandleTransmissionTask(
ctx,
types.DomainOperationCreate,
domainRequest.Info,
domainRequest.Config,
domainRequest.ReplicationConfig,
domainRequest.ConfigVersion,
domainRequest.FailoverVersion,
common.InitialPreviousFailoverVersion,
domainRequest.IsGlobalDomain,
)
if err != nil {
return err
}
}
d.logger.Info("Register domain succeeded",
tag.WorkflowDomainName(registerRequest.GetName()),
tag.WorkflowDomainID(domainResponse.ID),
)
return nil
}
// ListDomains list all domains
func (d *handlerImpl) ListDomains(
ctx context.Context,
listRequest *types.ListDomainsRequest,
) (*types.ListDomainsResponse, error) {
pageSize := 100
if listRequest.GetPageSize() != 0 {
pageSize = int(listRequest.GetPageSize())
}
resp, err := d.domainManager.ListDomains(ctx, &persistence.ListDomainsRequest{
PageSize: pageSize,
NextPageToken: listRequest.NextPageToken,
})
if err != nil {
return nil, err
}
domains := []*types.DescribeDomainResponse{}
for _, domain := range resp.Domains {
desc := &types.DescribeDomainResponse{
IsGlobalDomain: domain.IsGlobalDomain,
FailoverVersion: domain.FailoverVersion,
}
desc.DomainInfo, desc.Configuration, desc.ReplicationConfiguration = d.createResponse(domain.Info, domain.Config, domain.ReplicationConfig)
domains = append(domains, desc)
}
response := &types.ListDomainsResponse{
Domains: domains,
NextPageToken: resp.NextPageToken,
}
return response, nil
}
// DescribeDomain describe the domain
func (d *handlerImpl) DescribeDomain(
ctx context.Context,
describeRequest *types.DescribeDomainRequest,
) (*types.DescribeDomainResponse, error) {
// TODO, we should migrate the non global domain to new table, see #773
req := &persistence.GetDomainRequest{
Name: describeRequest.GetName(),
ID: describeRequest.GetUUID(),
}
resp, err := d.domainManager.GetDomain(ctx, req)
if err != nil {
return nil, err
}
response := &types.DescribeDomainResponse{
IsGlobalDomain: resp.IsGlobalDomain,
FailoverVersion: resp.FailoverVersion,
}
if resp.FailoverEndTime != nil {
response.FailoverInfo = &types.FailoverInfo{
FailoverVersion: resp.FailoverVersion,
// This reflects that last domain update time. If there is a domain config update, this won't be accurate.
FailoverStartTimestamp: resp.LastUpdatedTime,
FailoverExpireTimestamp: *resp.FailoverEndTime,
}
}
response.DomainInfo, response.Configuration, response.ReplicationConfiguration = d.createResponse(resp.Info, resp.Config, resp.ReplicationConfig)
return response, nil
}
// UpdateDomain update the domain
func (d *handlerImpl) UpdateDomain(
ctx context.Context,
updateRequest *types.UpdateDomainRequest,
) (*types.UpdateDomainResponse, error) {
// must get the metadata (notificationVersion) first
// this version can be regarded as the lock on the v2 domain table
// and since we do not know which table will return the domain afterwards
// this call has to be made
metadata, err := d.domainManager.GetMetadata(ctx)
if err != nil {
return nil, err
}
notificationVersion := metadata.NotificationVersion
getResponse, err := d.domainManager.GetDomain(ctx, &persistence.GetDomainRequest{Name: updateRequest.GetName()})
if err != nil {
return nil, err
}
info := getResponse.Info
config := getResponse.Config
replicationConfig := getResponse.ReplicationConfig
configVersion := getResponse.ConfigVersion
failoverVersion := getResponse.FailoverVersion
failoverNotificationVersion := getResponse.FailoverNotificationVersion
isGlobalDomain := getResponse.IsGlobalDomain
gracefulFailoverEndTime := getResponse.FailoverEndTime
currentActiveCluster := replicationConfig.ActiveClusterName
previousFailoverVersion := getResponse.PreviousFailoverVersion
lastUpdatedTime := time.Unix(0, getResponse.LastUpdatedTime)
// whether history archival config changed
historyArchivalConfigChanged := false
// whether visibility archival config changed
visibilityArchivalConfigChanged := false
// whether active cluster is changed
activeClusterChanged := false
// whether anything other than active cluster is changed
configurationChanged := false
// Update history archival state
historyArchivalState, historyArchivalConfigChanged, err := d.getHistoryArchivalState(
config,
updateRequest,
)
if err != nil {
return nil, err
}
if historyArchivalConfigChanged {
config.HistoryArchivalStatus = historyArchivalState.Status
config.HistoryArchivalURI = historyArchivalState.URI
}
// Update visibility archival state
visibilityArchivalState, visibilityArchivalConfigChanged, err := d.getVisibilityArchivalState(
config,
updateRequest,
)
if err != nil {
return nil, err
}
if visibilityArchivalConfigChanged {
config.VisibilityArchivalStatus = visibilityArchivalState.Status
config.VisibilityArchivalURI = visibilityArchivalState.URI
}
// Update domain info
info, domainInfoChanged := d.updateDomainInfo(
updateRequest,
info,
)
// Update domain config
config, domainConfigChanged, err := d.updateDomainConfiguration(
updateRequest.GetName(),
config,
updateRequest,
)
if err != nil {
return nil, err
}
// Update domain bad binary
config, deleteBinaryChanged, err := d.updateDeleteBadBinary(
config,
updateRequest.DeleteBadBinary,
)
if err != nil {
return nil, err
}
// Update replication config
replicationConfig, replicationConfigChanged, activeClusterChanged, err := d.updateReplicationConfig(
replicationConfig,
updateRequest,
)
if err != nil {
return nil, err
}
// Handle graceful failover request
if updateRequest.FailoverTimeoutInSeconds != nil {
// must update active cluster on a global domain
if !activeClusterChanged || !isGlobalDomain {
return nil, errInvalidGracefulFailover
}
// must start with the passive -> active cluster
if replicationConfig.ActiveClusterName != d.clusterMetadata.GetCurrentClusterName() {
return nil, errCannotDoGracefulFailoverFromCluster
}
if replicationConfig.ActiveClusterName == currentActiveCluster {
return nil, errGracefulFailoverInActiveCluster
}
// cannot have concurrent failover
if gracefulFailoverEndTime != nil {
return nil, errOngoingGracefulFailover
}
endTime := d.timeSource.Now().Add(time.Duration(updateRequest.GetFailoverTimeoutInSeconds()) * time.Second).UnixNano()
gracefulFailoverEndTime = &endTime
previousFailoverVersion = failoverVersion
}
configurationChanged = historyArchivalConfigChanged || visibilityArchivalConfigChanged || domainInfoChanged || domainConfigChanged || deleteBinaryChanged || replicationConfigChanged
if err := d.domainAttrValidator.validateDomainConfig(config); err != nil {
return nil, err
}
if isGlobalDomain {
if err := d.domainAttrValidator.validateDomainReplicationConfigForGlobalDomain(
replicationConfig,
); err != nil {
return nil, err
}
if configurationChanged && activeClusterChanged {
return nil, errCannotDoDomainFailoverAndUpdate
}
if !activeClusterChanged && !d.clusterMetadata.IsPrimaryCluster() {
return nil, errNotPrimaryCluster
}
} else {
if err := d.domainAttrValidator.validateDomainReplicationConfigForLocalDomain(
replicationConfig,
); err != nil {
return nil, err
}
}
if configurationChanged || activeClusterChanged {
now := d.timeSource.Now()
// Check the failover cool down time
if lastUpdatedTime.Add(d.config.FailoverCoolDown(info.Name)).After(now) {
return nil, errDomainUpdateTooFrequent
}
// set the versions
if configurationChanged {
configVersion++
}
if activeClusterChanged && isGlobalDomain {
// Force failover cleans graceful failover state
if updateRequest.FailoverTimeoutInSeconds == nil {
// force failover cleanup graceful failover state
gracefulFailoverEndTime = nil
previousFailoverVersion = common.InitialPreviousFailoverVersion
}
failoverVersion = d.clusterMetadata.GetNextFailoverVersion(
replicationConfig.ActiveClusterName,
failoverVersion,
updateRequest.Name,
)
failoverNotificationVersion = notificationVersion
}
lastUpdatedTime = now
updateReq := createUpdateRequest(
info,
config,
replicationConfig,
configVersion,
failoverVersion,
failoverNotificationVersion,
gracefulFailoverEndTime,
previousFailoverVersion,
lastUpdatedTime,
notificationVersion,
)
err = d.domainManager.UpdateDomain(ctx, &updateReq)
if err != nil {
return nil, err
}
}
if isGlobalDomain {
if err := d.domainReplicator.HandleTransmissionTask(
ctx,
types.DomainOperationUpdate,
info,
config,
replicationConfig,
configVersion,
failoverVersion,
previousFailoverVersion,
isGlobalDomain,
); err != nil {
return nil, err
}
}
response := &types.UpdateDomainResponse{
IsGlobalDomain: isGlobalDomain,
FailoverVersion: failoverVersion,
}
response.DomainInfo, response.Configuration, response.ReplicationConfiguration = d.createResponse(info, config, replicationConfig)
d.logger.Info("Update domain succeeded",
tag.WorkflowDomainName(info.Name),
tag.WorkflowDomainID(info.ID),
)
return response, nil
}
// DeprecateDomain deprecates a domain
func (d *handlerImpl) DeprecateDomain(
ctx context.Context,
deprecateRequest *types.DeprecateDomainRequest,
) error {
// must get the metadata (notificationVersion) first
// this version can be regarded as the lock on the v2 domain table
// and since we do not know which table will return the domain afterwards
// this call has to be made
metadata, err := d.domainManager.GetMetadata(ctx)
if err != nil {
return err
}
notificationVersion := metadata.NotificationVersion
getResponse, err := d.domainManager.GetDomain(ctx, &persistence.GetDomainRequest{Name: deprecateRequest.GetName()})
if err != nil {
return err
}
isGlobalDomain := getResponse.IsGlobalDomain
if isGlobalDomain && !d.clusterMetadata.IsPrimaryCluster() {
return errNotPrimaryCluster
}
getResponse.ConfigVersion = getResponse.ConfigVersion + 1
getResponse.Info.Status = persistence.DomainStatusDeprecated
updateReq := createUpdateRequest(
getResponse.Info,
getResponse.Config,
getResponse.ReplicationConfig,
getResponse.ConfigVersion,
getResponse.FailoverVersion,
getResponse.FailoverNotificationVersion,
getResponse.FailoverEndTime,
getResponse.PreviousFailoverVersion,
d.timeSource.Now(),
notificationVersion,
)
err = d.domainManager.UpdateDomain(ctx, &updateReq)
if err != nil {
return err
}
if isGlobalDomain {
if err := d.domainReplicator.HandleTransmissionTask(
ctx,
types.DomainOperationUpdate,
getResponse.Info,
getResponse.Config,
getResponse.ReplicationConfig,
getResponse.ConfigVersion,
getResponse.FailoverVersion,
getResponse.PreviousFailoverVersion,
isGlobalDomain,
); err != nil {
return err
}
}
d.logger.Info("DeprecateDomain domain succeeded",
tag.WorkflowDomainName(getResponse.Info.Name),
tag.WorkflowDomainID(getResponse.Info.ID),
)
return nil
}
// UpdateIsolationGroups is used for draining and undraining of isolation-groups for a domain.
// Like the isolation-group API, this controller expects Upsert semantics for
// isolation-groups and does not modify any other domain information.
//
// Isolation-groups are regional in their configuration scope, so it's expected that this upsert
// includes configuration for both clusters every time.
//
// The update is handled like other domain updates in that they expected to be replicated. So
// unlike the global isolation-group API it shouldn't be necessary to call
func (d *handlerImpl) UpdateIsolationGroups(
ctx context.Context,
updateRequest types.UpdateDomainIsolationGroupsRequest,
) error {
// must get the metadata (notificationVersion) first
// this version can be regarded as the lock on the v2 domain table
// and since we do not know which table will return the domain afterwards
// this call has to be made
metadata, err := d.domainManager.GetMetadata(ctx)
if err != nil {
return err
}
notificationVersion := metadata.NotificationVersion
if updateRequest.IsolationGroups == nil {
return fmt.Errorf("invalid request, isolationGroup configuration must be set: Got: %v", updateRequest)
}
currentDomainConfig, err := d.domainManager.GetDomain(ctx, &persistence.GetDomainRequest{Name: updateRequest.Domain})
if err != nil {
return err
}
if currentDomainConfig.Config == nil {
return fmt.Errorf("unable to load config for domain as expected")
}
configVersion := currentDomainConfig.ConfigVersion
lastUpdatedTime := time.Unix(0, currentDomainConfig.LastUpdatedTime)
// Check the failover cool down time
if lastUpdatedTime.Add(d.config.FailoverCoolDown(currentDomainConfig.Info.Name)).After(d.timeSource.Now()) {
return errDomainUpdateTooFrequent
}
if !d.clusterMetadata.IsPrimaryCluster() && currentDomainConfig.IsGlobalDomain {
return errNotPrimaryCluster
}
configVersion++
lastUpdatedTime = d.timeSource.Now()
// Mutate the domain config to perform the isolation-group update
currentDomainConfig.Config.IsolationGroups = updateRequest.IsolationGroups
updateReq := createUpdateRequest(
currentDomainConfig.Info,
currentDomainConfig.Config,
currentDomainConfig.ReplicationConfig,
configVersion,
currentDomainConfig.FailoverVersion,
currentDomainConfig.FailoverNotificationVersion,
currentDomainConfig.FailoverEndTime,
currentDomainConfig.PreviousFailoverVersion,
lastUpdatedTime,
notificationVersion,
)
err = d.domainManager.UpdateDomain(ctx, &updateReq)
if err != nil {
return err
}
if currentDomainConfig.IsGlobalDomain {
// One might reasonably wonder what value there is in replication of isolation-group information - info which is
// regional and therefore of no value to the other region?
// Probably not a lot, in and of itself, however, the isolation-group information is stored
// in the domain configuration fields in the domain tables. Access and updates to those records is
// done through a replicated mechanism with explicit versioning and conflict resolution.
// Therefore, in order to avoid making an already complex mechanisim much more difficult to understand,
// the data is replicated in the same way so as to try and make things less confusing when both codepaths
// are updating the table:
// - versions like the confiugration version are updated in the same manner
// - the last-updated timestamps are updated in the same manner
if err := d.domainReplicator.HandleTransmissionTask(
ctx,
types.DomainOperationUpdate,
currentDomainConfig.Info,
currentDomainConfig.Config,
currentDomainConfig.ReplicationConfig,
configVersion,
currentDomainConfig.FailoverVersion,
currentDomainConfig.PreviousFailoverVersion,
currentDomainConfig.IsGlobalDomain,
); err != nil {
return err
}
}
d.logger.Info("isolation group update succeeded",
tag.WorkflowDomainName(currentDomainConfig.Info.Name),
tag.WorkflowDomainID(currentDomainConfig.Info.ID),
)
return nil
}
func (d *handlerImpl) UpdateAsyncWorkflowConfiguraton(
ctx context.Context,
updateRequest types.UpdateDomainAsyncWorkflowConfiguratonRequest,
) error {
// must get the metadata (notificationVersion) first
// this version can be regarded as the lock on the v2 domain table
// and since we do not know which table will return the domain afterwards
// this call has to be made
metadata, err := d.domainManager.GetMetadata(ctx)
if err != nil {
return err
}
notificationVersion := metadata.NotificationVersion
currentDomainConfig, err := d.domainManager.GetDomain(ctx, &persistence.GetDomainRequest{Name: updateRequest.Domain})
if err != nil {
return err
}
if currentDomainConfig.Config == nil {
return fmt.Errorf("unable to load config for domain as expected")
}
configVersion := currentDomainConfig.ConfigVersion
lastUpdatedTime := time.Unix(0, currentDomainConfig.LastUpdatedTime)
// Check the failover cool down time
if lastUpdatedTime.Add(d.config.FailoverCoolDown(currentDomainConfig.Info.Name)).After(d.timeSource.Now()) {
return errDomainUpdateTooFrequent
}
if !d.clusterMetadata.IsPrimaryCluster() && currentDomainConfig.IsGlobalDomain {
return errNotPrimaryCluster
}
configVersion++
lastUpdatedTime = d.timeSource.Now()
// Mutate the domain config to perform the async wf config update
if updateRequest.Configuration == nil {
// this is a delete request so empty all the fields
currentDomainConfig.Config.AsyncWorkflowConfig = types.AsyncWorkflowConfiguration{}
} else {
currentDomainConfig.Config.AsyncWorkflowConfig = *updateRequest.Configuration
}
d.logger.Debug("async workflow queue config update", tag.Dynamic("config", currentDomainConfig))
updateReq := createUpdateRequest(
currentDomainConfig.Info,
currentDomainConfig.Config,
currentDomainConfig.ReplicationConfig,
configVersion,
currentDomainConfig.FailoverVersion,
currentDomainConfig.FailoverNotificationVersion,
currentDomainConfig.FailoverEndTime,
currentDomainConfig.PreviousFailoverVersion,
lastUpdatedTime,
notificationVersion,
)
err = d.domainManager.UpdateDomain(ctx, &updateReq)
if err != nil {
return err
}
if currentDomainConfig.IsGlobalDomain {
// One might reasonably wonder what value there is in replication of isolation-group information - info which is
// regional and therefore of no value to the other region?
// Probably not a lot, in and of itself, however, the isolation-group information is stored
// in the domain configuration fields in the domain tables. Access and updates to those records is
// done through a replicated mechanism with explicit versioning and conflict resolution.
// Therefore, in order to avoid making an already complex mechanisim much more difficult to understand,
// the data is replicated in the same way so as to try and make things less confusing when both codepaths
// are updating the table:
// - versions like the confiugration version are updated in the same manner
// - the last-updated timestamps are updated in the same manner
if err := d.domainReplicator.HandleTransmissionTask(
ctx,
types.DomainOperationUpdate,
currentDomainConfig.Info,
currentDomainConfig.Config,
currentDomainConfig.ReplicationConfig,
configVersion,
currentDomainConfig.FailoverVersion,
currentDomainConfig.PreviousFailoverVersion,
currentDomainConfig.IsGlobalDomain,
); err != nil {
return err
}
}
d.logger.Info("async workflow queue config update succeeded",
tag.WorkflowDomainName(currentDomainConfig.Info.Name),
tag.WorkflowDomainID(currentDomainConfig.Info.ID),
)
return nil
}
func (d *handlerImpl) createResponse(
info *persistence.DomainInfo,
config *persistence.DomainConfig,
replicationConfig *persistence.DomainReplicationConfig,
) (*types.DomainInfo, *types.DomainConfiguration, *types.DomainReplicationConfiguration) {
infoResult := &types.DomainInfo{
Name: info.Name,
Status: getDomainStatus(info),
Description: info.Description,
OwnerEmail: info.OwnerEmail,
Data: info.Data,
UUID: info.ID,
}
configResult := &types.DomainConfiguration{
EmitMetric: config.EmitMetric,
WorkflowExecutionRetentionPeriodInDays: config.Retention,
HistoryArchivalStatus: config.HistoryArchivalStatus.Ptr(),
HistoryArchivalURI: config.HistoryArchivalURI,
VisibilityArchivalStatus: config.VisibilityArchivalStatus.Ptr(),
VisibilityArchivalURI: config.VisibilityArchivalURI,
BadBinaries: &config.BadBinaries,
IsolationGroups: &config.IsolationGroups,
AsyncWorkflowConfig: &config.AsyncWorkflowConfig,
}
clusters := []*types.ClusterReplicationConfiguration{}
for _, cluster := range replicationConfig.Clusters {
clusters = append(clusters, &types.ClusterReplicationConfiguration{
ClusterName: cluster.ClusterName,
})
}
replicationConfigResult := &types.DomainReplicationConfiguration{
ActiveClusterName: replicationConfig.ActiveClusterName,
Clusters: clusters,
}
return infoResult, configResult, replicationConfigResult
}
func (d *handlerImpl) mergeBadBinaries(
old map[string]*types.BadBinaryInfo,
new map[string]*types.BadBinaryInfo,
createTimeNano int64,
) types.BadBinaries {
if old == nil {
old = map[string]*types.BadBinaryInfo{}
}
for k, v := range new {
v.CreatedTimeNano = common.Int64Ptr(createTimeNano)
old[k] = v
}
return types.BadBinaries{
Binaries: old,
}
}
func (d *handlerImpl) mergeDomainData(
old map[string]string,
new map[string]string,
) map[string]string {
if old == nil {
old = map[string]string{}
}
for k, v := range new {
old[k] = v
}
return old
}
func (d *handlerImpl) toArchivalRegisterEvent(
status *types.ArchivalStatus,
URI string,
defaultStatus types.ArchivalStatus,
defaultURI string,
) (*ArchivalEvent, error) {
event := &ArchivalEvent{
status: status,
URI: URI,
defaultURI: defaultURI,
}
if event.status == nil {
event.status = defaultStatus.Ptr()
}
if err := event.validate(); err != nil {
return nil, err
}
return event, nil
}
func (d *handlerImpl) toArchivalUpdateEvent(
status *types.ArchivalStatus,
URI string,
defaultURI string,
) (*ArchivalEvent, error) {
event := &ArchivalEvent{
status: status,
URI: URI,
defaultURI: defaultURI,
}
if err := event.validate(); err != nil {
return nil, err
}
return event, nil
}
func (d *handlerImpl) validateHistoryArchivalURI(URIString string) error {
URI, err := archiver.NewURI(URIString)
if err != nil {
return err
}
archiver, err := d.archiverProvider.GetHistoryArchiver(URI.Scheme(), service.Frontend)
if err != nil {
return err
}
return archiver.ValidateURI(URI)
}
func (d *handlerImpl) validateVisibilityArchivalURI(URIString string) error {
URI, err := archiver.NewURI(URIString)
if err != nil {
return err
}
archiver, err := d.archiverProvider.GetVisibilityArchiver(URI.Scheme(), service.Frontend)
if err != nil {
return err
}
return archiver.ValidateURI(URI)
}
func (d *handlerImpl) getHistoryArchivalState(
config *persistence.DomainConfig,
updateRequest *types.UpdateDomainRequest,
) (*ArchivalState, bool, error) {
currentHistoryArchivalState := &ArchivalState{
Status: config.HistoryArchivalStatus,
URI: config.HistoryArchivalURI,
}
clusterHistoryArchivalConfig := d.archivalMetadata.GetHistoryConfig()
if clusterHistoryArchivalConfig.ClusterConfiguredForArchival() {
archivalEvent, err := d.toArchivalUpdateEvent(
updateRequest.HistoryArchivalStatus,
updateRequest.GetHistoryArchivalURI(),
clusterHistoryArchivalConfig.GetDomainDefaultURI(),
)
if err != nil {
return currentHistoryArchivalState, false, err
}
return currentHistoryArchivalState.getNextState(archivalEvent, d.validateHistoryArchivalURI)
}
return currentHistoryArchivalState, false, nil
}
func (d *handlerImpl) getVisibilityArchivalState(
config *persistence.DomainConfig,
updateRequest *types.UpdateDomainRequest,
) (*ArchivalState, bool, error) {
currentVisibilityArchivalState := &ArchivalState{
Status: config.VisibilityArchivalStatus,
URI: config.VisibilityArchivalURI,
}
clusterVisibilityArchivalConfig := d.archivalMetadata.GetVisibilityConfig()
if clusterVisibilityArchivalConfig.ClusterConfiguredForArchival() {
archivalEvent, err := d.toArchivalUpdateEvent(
updateRequest.VisibilityArchivalStatus,
updateRequest.GetVisibilityArchivalURI(),
clusterVisibilityArchivalConfig.GetDomainDefaultURI(),
)
if err != nil {
return currentVisibilityArchivalState, false, err
}
return currentVisibilityArchivalState.getNextState(archivalEvent, d.validateVisibilityArchivalURI)
}
return currentVisibilityArchivalState, false, nil
}
func (d *handlerImpl) updateDomainInfo(
updateRequest *types.UpdateDomainRequest,
currentDomainInfo *persistence.DomainInfo,
) (*persistence.DomainInfo, bool) {
isDomainUpdated := false
if updateRequest.Description != nil {
isDomainUpdated = true
currentDomainInfo.Description = *updateRequest.Description
}
if updateRequest.OwnerEmail != nil {
isDomainUpdated = true
currentDomainInfo.OwnerEmail = *updateRequest.OwnerEmail
}
if updateRequest.Data != nil {
isDomainUpdated = true
// only do merging
currentDomainInfo.Data = d.mergeDomainData(currentDomainInfo.Data, updateRequest.Data)
}
return currentDomainInfo, isDomainUpdated
}
func (d *handlerImpl) updateDomainConfiguration(
domainName string,
config *persistence.DomainConfig,
updateRequest *types.UpdateDomainRequest,
) (*persistence.DomainConfig, bool, error) {
isConfigChanged := false
if updateRequest.EmitMetric != nil {
isConfigChanged = true
config.EmitMetric = *updateRequest.EmitMetric
}
if updateRequest.WorkflowExecutionRetentionPeriodInDays != nil {
isConfigChanged = true
config.Retention = *updateRequest.WorkflowExecutionRetentionPeriodInDays
}
if updateRequest.BadBinaries != nil {
maxLength := d.config.MaxBadBinaryCount(domainName)
// only do merging
config.BadBinaries = d.mergeBadBinaries(config.BadBinaries.Binaries, updateRequest.BadBinaries.Binaries, time.Now().UnixNano())
if len(config.BadBinaries.Binaries) > maxLength {
return config, isConfigChanged, &types.BadRequestError{
Message: fmt.Sprintf("Total resetBinaries cannot exceed the max limit: %v", maxLength),
}
}
}
return config, isConfigChanged, nil
}
func (d *handlerImpl) updateDeleteBadBinary(
config *persistence.DomainConfig,
deleteBadBinary *string,
) (*persistence.DomainConfig, bool, error) {
if deleteBadBinary != nil {
_, ok := config.BadBinaries.Binaries[*deleteBadBinary]
if !ok {
return config, false, &types.BadRequestError{
Message: fmt.Sprintf("Bad binary checksum %v doesn't exists.", *deleteBadBinary),
}
}
delete(config.BadBinaries.Binaries, *deleteBadBinary)
return config, true, nil
}
return config, false, nil
}
func (d *handlerImpl) updateReplicationConfig(
config *persistence.DomainReplicationConfig,
updateRequest *types.UpdateDomainRequest,
) (*persistence.DomainReplicationConfig, bool, bool, error) {
clusterUpdated := false
activeClusterUpdated := false
if len(updateRequest.Clusters) != 0 {
clusterUpdated = true
clustersNew := []*persistence.ClusterReplicationConfig{}
for _, clusterConfig := range updateRequest.Clusters {
clustersNew = append(clustersNew, &persistence.ClusterReplicationConfig{
ClusterName: clusterConfig.GetClusterName(),
})
}
if err := d.domainAttrValidator.validateDomainReplicationConfigClustersDoesNotRemove(
config.Clusters,
clustersNew,
); err != nil {
d.logger.Warn("removing replica clusters from domain replication group", tag.Error(err))
}
config.Clusters = clustersNew
}
if updateRequest.ActiveClusterName != nil {
activeClusterUpdated = true
config.ActiveClusterName = *updateRequest.ActiveClusterName
}
return config, clusterUpdated, activeClusterUpdated, nil
}
func getDomainStatus(info *persistence.DomainInfo) *types.DomainStatus {
switch info.Status {
case persistence.DomainStatusRegistered:
v := types.DomainStatusRegistered
return &v
case persistence.DomainStatusDeprecated:
v := types.DomainStatusDeprecated
return &v
case persistence.DomainStatusDeleted:
v := types.DomainStatusDeleted
return &v
}
return nil
}
// Maps fields onto an updateDomain Request
// it's really important that this explicitly calls out each field to ensure no fields get missed or dropped
func createUpdateRequest(
info *persistence.DomainInfo,
config *persistence.DomainConfig,
replicationConfig *persistence.DomainReplicationConfig,
configVersion int64,
failoverVersion int64,
failoverNotificationVersion int64,
failoverEndTime *int64,
previousFailoverVersion int64,
lastUpdatedTime time.Time,
notificationVersion int64,
) persistence.UpdateDomainRequest {
return persistence.UpdateDomainRequest{
Info: info,
Config: config,
ReplicationConfig: replicationConfig,
ConfigVersion: configVersion,
FailoverVersion: failoverVersion,
FailoverNotificationVersion: failoverNotificationVersion,
FailoverEndTime: failoverEndTime,
PreviousFailoverVersion: previousFailoverVersion,
LastUpdatedTime: lastUpdatedTime.UnixNano(),
NotificationVersion: notificationVersion,
}
}