common/persistence/nosql/nosql_domain_store.go (248 lines of code) (raw):
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package nosql
import (
"context"
"fmt"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/persistence/nosql/nosqlplugin"
"github.com/uber/cadence/common/types"
)
type nosqlDomainStore struct {
nosqlStore
currentClusterName string
}
// newNoSQLDomainStore is used to create an instance of DomainStore implementation
func newNoSQLDomainStore(
cfg config.ShardedNoSQL,
currentClusterName string,
logger log.Logger,
dc *persistence.DynamicConfiguration,
) (persistence.DomainStore, error) {
shardedStore, err := newShardedNosqlStore(cfg, logger, dc)
if err != nil {
return nil, err
}
return &nosqlDomainStore{
nosqlStore: shardedStore.GetDefaultShard(),
currentClusterName: currentClusterName,
}, nil
}
// CreateDomain create a domain
// Cassandra does not support conditional updates across multiple tables. For this reason we have to first insert into
// 'Domains' table and then do a conditional insert into domains_by_name table. If the conditional write fails we
// delete the orphaned entry from domains table. There is a chance delete entry could fail and we never delete the
// orphaned entry from domains table. We might need a background job to delete those orphaned record.
func (m *nosqlDomainStore) CreateDomain(
ctx context.Context,
request *persistence.InternalCreateDomainRequest,
) (*persistence.CreateDomainResponse, error) {
config, err := m.toNoSQLInternalDomainConfig(request.Config)
if err != nil {
return nil, err
}
row := &nosqlplugin.DomainRow{
Info: request.Info,
Config: config,
ReplicationConfig: request.ReplicationConfig,
ConfigVersion: request.ConfigVersion,
FailoverVersion: request.FailoverVersion,
FailoverNotificationVersion: persistence.InitialFailoverNotificationVersion,
PreviousFailoverVersion: common.InitialPreviousFailoverVersion,
FailoverEndTime: nil,
IsGlobalDomain: request.IsGlobalDomain,
LastUpdatedTime: request.LastUpdatedTime,
}
err = m.db.InsertDomain(ctx, row)
if err != nil {
if _, ok := err.(*types.DomainAlreadyExistsError); ok {
return nil, err
}
return nil, convertCommonErrors(m.db, "CreateDomain", err)
}
return &persistence.CreateDomainResponse{ID: request.Info.ID}, nil
}
func (m *nosqlDomainStore) UpdateDomain(
ctx context.Context,
request *persistence.InternalUpdateDomainRequest,
) error {
config, err := m.toNoSQLInternalDomainConfig(request.Config)
if err != nil {
return err
}
row := &nosqlplugin.DomainRow{
Info: request.Info,
Config: config,
ReplicationConfig: request.ReplicationConfig,
ConfigVersion: request.ConfigVersion,
FailoverVersion: request.FailoverVersion,
FailoverNotificationVersion: request.FailoverNotificationVersion,
PreviousFailoverVersion: request.PreviousFailoverVersion,
FailoverEndTime: request.FailoverEndTime,
NotificationVersion: request.NotificationVersion,
LastUpdatedTime: request.LastUpdatedTime,
}
err = m.db.UpdateDomain(ctx, row)
if err != nil {
return convertCommonErrors(m.db, "UpdateDomain", err)
}
return nil
}
func (m *nosqlDomainStore) GetDomain(
ctx context.Context,
request *persistence.GetDomainRequest,
) (*persistence.InternalGetDomainResponse, error) {
if len(request.ID) > 0 && len(request.Name) > 0 {
return nil, &types.BadRequestError{
Message: "GetDomain operation failed. Both ID and Name specified in request.",
}
} else if len(request.ID) == 0 && len(request.Name) == 0 {
return nil, &types.BadRequestError{
Message: "GetDomain operation failed. Both ID and Name are empty.",
}
}
var domainName *string
var domainID *string
if len(request.ID) > 0 {
domainID = common.StringPtr(request.ID)
} else {
domainName = common.StringPtr(request.Name)
}
handleError := func(name, ID string, err error) error {
identity := name
if len(ID) > 0 {
identity = ID
}
if m.db.IsNotFoundError(err) {
return &types.EntityNotExistsError{
Message: fmt.Sprintf("Domain %s does not exist.", identity),
}
}
return convertCommonErrors(m.db, "GetDomain", err)
}
row, err := m.db.SelectDomain(ctx, domainID, domainName)
if err != nil {
return nil, handleError(request.Name, request.ID, err)
}
if row.Info.Data == nil {
row.Info.Data = map[string]string{}
}
row.ReplicationConfig.ActiveClusterName = cluster.GetOrUseDefaultActiveCluster(m.currentClusterName, row.ReplicationConfig.ActiveClusterName)
row.ReplicationConfig.Clusters = cluster.GetOrUseDefaultClusters(m.currentClusterName, row.ReplicationConfig.Clusters)
domainConfig, err := m.fromNoSQLInternalDomainConfig(row.Config)
if err != nil {
return nil, &types.InternalServiceError{
Message: fmt.Sprintf("cannot convert fromNoSQLInternalDomainConfig, %v ", err),
}
}
return &persistence.InternalGetDomainResponse{
Info: row.Info,
Config: domainConfig,
ReplicationConfig: row.ReplicationConfig,
IsGlobalDomain: row.IsGlobalDomain,
ConfigVersion: row.ConfigVersion,
FailoverVersion: row.FailoverVersion,
FailoverNotificationVersion: row.FailoverNotificationVersion,
PreviousFailoverVersion: row.PreviousFailoverVersion,
FailoverEndTime: row.FailoverEndTime,
NotificationVersion: row.NotificationVersion,
LastUpdatedTime: row.LastUpdatedTime,
}, nil
}
func (m *nosqlDomainStore) ListDomains(
ctx context.Context,
request *persistence.ListDomainsRequest,
) (*persistence.InternalListDomainsResponse, error) {
rows, nextPageToken, err := m.db.SelectAllDomains(ctx, request.PageSize, request.NextPageToken)
if err != nil {
return nil, convertCommonErrors(m.db, "ListDomains", err)
}
var domains []*persistence.InternalGetDomainResponse
for _, row := range rows {
if row.Info.Data == nil {
row.Info.Data = map[string]string{}
}
row.ReplicationConfig.ActiveClusterName = cluster.GetOrUseDefaultActiveCluster(m.currentClusterName, row.ReplicationConfig.ActiveClusterName)
row.ReplicationConfig.Clusters = cluster.GetOrUseDefaultClusters(m.currentClusterName, row.ReplicationConfig.Clusters)
domainConfig, err := m.fromNoSQLInternalDomainConfig(row.Config)
if err != nil {
return nil, &types.InternalServiceError{
Message: fmt.Sprintf("cannot convert fromNoSQLInternalDomainConfig, %v ", err),
}
}
domains = append(domains, &persistence.InternalGetDomainResponse{
Info: row.Info,
Config: domainConfig,
ReplicationConfig: row.ReplicationConfig,
IsGlobalDomain: row.IsGlobalDomain,
ConfigVersion: row.ConfigVersion,
FailoverVersion: row.FailoverVersion,
FailoverNotificationVersion: row.FailoverNotificationVersion,
PreviousFailoverVersion: row.PreviousFailoverVersion,
FailoverEndTime: row.FailoverEndTime,
NotificationVersion: row.NotificationVersion,
LastUpdatedTime: row.LastUpdatedTime,
})
}
return &persistence.InternalListDomainsResponse{
Domains: domains,
NextPageToken: nextPageToken,
}, nil
}
func (m *nosqlDomainStore) DeleteDomain(
ctx context.Context,
request *persistence.DeleteDomainRequest,
) error {
if err := m.db.DeleteDomain(ctx, &request.ID, nil); err != nil {
return convertCommonErrors(m.db, "DeleteDomain", err)
}
return nil
}
func (m *nosqlDomainStore) DeleteDomainByName(
ctx context.Context,
request *persistence.DeleteDomainByNameRequest,
) error {
if err := m.db.DeleteDomain(ctx, nil, &request.Name); err != nil {
return convertCommonErrors(m.db, "DeleteDomainByName", err)
}
return nil
}
func (m *nosqlDomainStore) GetMetadata(
ctx context.Context,
) (*persistence.GetMetadataResponse, error) {
notificationVersion, err := m.db.SelectDomainMetadata(ctx)
if err != nil {
return nil, convertCommonErrors(m.db, "GetMetadata", err)
}
return &persistence.GetMetadataResponse{NotificationVersion: notificationVersion}, nil
}
func (m *nosqlDomainStore) toNoSQLInternalDomainConfig(
domainConfig *persistence.InternalDomainConfig,
) (*nosqlplugin.NoSQLInternalDomainConfig, error) {
return &nosqlplugin.NoSQLInternalDomainConfig{
Retention: domainConfig.Retention,
EmitMetric: domainConfig.EmitMetric,
ArchivalBucket: domainConfig.ArchivalBucket,
ArchivalStatus: domainConfig.ArchivalStatus,
HistoryArchivalStatus: domainConfig.HistoryArchivalStatus,
HistoryArchivalURI: domainConfig.HistoryArchivalURI,
VisibilityArchivalStatus: domainConfig.VisibilityArchivalStatus,
VisibilityArchivalURI: domainConfig.VisibilityArchivalURI,
BadBinaries: domainConfig.BadBinaries,
IsolationGroups: domainConfig.IsolationGroups,
AsyncWorkflowsConfig: domainConfig.AsyncWorkflowsConfig,
}, nil
}
func (m *nosqlDomainStore) fromNoSQLInternalDomainConfig(
domainConfig *nosqlplugin.NoSQLInternalDomainConfig,
) (*persistence.InternalDomainConfig, error) {
return &persistence.InternalDomainConfig{
Retention: domainConfig.Retention,
EmitMetric: domainConfig.EmitMetric,
ArchivalBucket: domainConfig.ArchivalBucket,
ArchivalStatus: domainConfig.ArchivalStatus,
HistoryArchivalStatus: domainConfig.HistoryArchivalStatus,
HistoryArchivalURI: domainConfig.HistoryArchivalURI,
VisibilityArchivalStatus: domainConfig.VisibilityArchivalStatus,
VisibilityArchivalURI: domainConfig.VisibilityArchivalURI,
BadBinaries: domainConfig.BadBinaries,
IsolationGroups: domainConfig.IsolationGroups,
AsyncWorkflowsConfig: domainConfig.AsyncWorkflowsConfig,
}, nil
}