host/testcluster.go (356 lines of code) (raw):
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package host
import (
"context"
"io/ioutil"
"os"
"testing"
"time"
"github.com/startreedata/pinot-client-go/pinot"
"github.com/uber-go/tally"
adminClient "github.com/uber/cadence/client/admin"
"github.com/uber/cadence/common/archiver"
"github.com/uber/cadence/common/archiver/filestore"
"github.com/uber/cadence/common/archiver/provider"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/domain"
"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/elasticsearch"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/log/testlogger"
"github.com/uber/cadence/common/messaging"
"github.com/uber/cadence/common/messaging/kafka"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/mocks"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/persistence/nosql"
persistencetests "github.com/uber/cadence/common/persistence/persistence-tests"
"github.com/uber/cadence/common/persistence/persistence-tests/testcluster"
"github.com/uber/cadence/common/persistence/sql"
"github.com/uber/cadence/common/persistence/sql/sqlplugin/mysql"
"github.com/uber/cadence/common/persistence/sql/sqlplugin/postgres"
pnt "github.com/uber/cadence/common/pinot"
"github.com/uber/cadence/testflags"
// the import is a test dependency
_ "github.com/uber/cadence/common/persistence/nosql/nosqlplugin/cassandra/gocql/public"
)
type (
// TestCluster is a base struct for integration tests
TestCluster struct {
testBase *persistencetests.TestBase
archiverBase *ArchiverBase
host Cadence
}
// ArchiverBase is a base struct for archiver provider being used in integration tests
ArchiverBase struct {
metadata archiver.ArchivalMetadata
provider provider.ArchiverProvider
historyStoreDirectory string
visibilityStoreDirectory string
historyURI string
visibilityURI string
}
// TestClusterConfig are config for a test cluster
TestClusterConfig struct {
FrontendAddress string
EnableArchival bool
IsPrimaryCluster bool
ClusterNo int
ClusterGroupMetadata config.ClusterGroupMetadata
MessagingClientConfig *MessagingClientConfig
Persistence persistencetests.TestBaseOptions
HistoryConfig *HistoryConfig
ESConfig *config.ElasticSearchConfig
WorkerConfig *WorkerConfig
MockAdminClient map[string]adminClient.Client
PinotConfig *config.PinotVisibilityConfig
AsyncWFQueues map[string]config.AsyncWorkflowQueueProvider
// TimeSource is used to override the time source of internal components.
// Note that most components don't respect this, and it's only used in a few places.
// e.g. async workflow test's consumer manager and domain manager
TimeSource clock.MockedTimeSource
FrontendDynamicConfigOverrides map[dynamicconfig.Key]interface{}
HistoryDynamicConfigOverrides map[dynamicconfig.Key]interface{}
MatchingDynamicConfigOverrides map[dynamicconfig.Key]interface{}
WorkerDynamicConfigOverrides map[dynamicconfig.Key]interface{}
}
// MessagingClientConfig is the config for messaging config
MessagingClientConfig struct {
UseMock bool
KafkaConfig *config.KafkaConfig
}
// WorkerConfig is the config for enabling/disabling cadence worker
WorkerConfig struct {
EnableArchiver bool
EnableIndexer bool
EnableReplicator bool
EnableAsyncWFConsumer bool
}
)
const (
defaultTestValueOfESIndexMaxResultWindow = 5
defaultTestPersistenceTimeout = 5 * time.Second
)
// NewCluster creates and sets up the test cluster
func NewCluster(t *testing.T, options *TestClusterConfig, logger log.Logger, params persistencetests.TestBaseParams) (*TestCluster, error) {
testBase := persistencetests.NewTestBaseFromParams(t, params)
testBase.Setup()
setupShards(testBase, options.HistoryConfig.NumHistoryShards, logger)
archiverBase := newArchiverBase(options.EnableArchival, logger)
messagingClient := getMessagingClient(options.MessagingClientConfig, logger)
pConfig := testBase.Config()
pConfig.NumHistoryShards = options.HistoryConfig.NumHistoryShards
var esClient elasticsearch.GenericClient
if options.WorkerConfig.EnableIndexer {
var err error
esClient, err = elasticsearch.NewGenericClient(options.ESConfig, logger)
if err != nil {
return nil, err
}
pConfig.AdvancedVisibilityStore = "es-visibility"
}
scope := tally.NewTestScope("integration-test", nil)
metricsClient := metrics.NewClient(scope, metrics.ServiceIdx(0))
domainReplicationQueue := domain.NewReplicationQueue(
testBase.DomainReplicationQueueMgr,
options.ClusterGroupMetadata.CurrentClusterName,
metricsClient,
logger,
)
aConfig := noopAuthorizationConfig()
cadenceParams := &CadenceParams{
ClusterMetadata: params.ClusterMetadata,
PersistenceConfig: pConfig,
MessagingClient: messagingClient,
DomainManager: testBase.DomainManager,
HistoryV2Mgr: testBase.HistoryV2Mgr,
ExecutionMgrFactory: testBase.ExecutionMgrFactory,
DomainReplicationQueue: domainReplicationQueue,
Logger: logger,
ClusterNo: options.ClusterNo,
ESConfig: options.ESConfig,
ESClient: esClient,
ArchiverMetadata: archiverBase.metadata,
ArchiverProvider: archiverBase.provider,
HistoryConfig: options.HistoryConfig,
WorkerConfig: options.WorkerConfig,
MockAdminClient: options.MockAdminClient,
DomainReplicationTaskExecutor: domain.NewReplicationTaskExecutor(testBase.DomainManager, clock.NewRealTimeSource(), logger),
AuthorizationConfig: aConfig,
AsyncWFQueues: options.AsyncWFQueues,
TimeSource: options.TimeSource,
FrontendDynCfgOverrides: options.FrontendDynamicConfigOverrides,
HistoryDynCfgOverrides: options.HistoryDynamicConfigOverrides,
MatchingDynCfgOverrides: options.MatchingDynamicConfigOverrides,
WorkerDynCfgOverrides: options.WorkerDynamicConfigOverrides,
}
cluster := NewCadence(cadenceParams)
if err := cluster.Start(); err != nil {
return nil, err
}
return &TestCluster{testBase: testBase, archiverBase: archiverBase, host: cluster}, nil
}
func NewPinotTestCluster(t *testing.T, options *TestClusterConfig, logger log.Logger, params persistencetests.TestBaseParams) (*TestCluster, error) {
testBase := persistencetests.NewTestBaseFromParams(t, params)
testBase.Setup()
setupShards(testBase, options.HistoryConfig.NumHistoryShards, logger)
archiverBase := newArchiverBase(options.EnableArchival, logger)
messagingClient := getMessagingClient(options.MessagingClientConfig, logger)
pConfig := testBase.Config()
pConfig.NumHistoryShards = options.HistoryConfig.NumHistoryShards
var esClient elasticsearch.GenericClient
var pinotClient pnt.GenericClient
if options.WorkerConfig.EnableIndexer {
var err error
esClient, err = elasticsearch.NewGenericClient(options.ESConfig, logger)
if err != nil {
return nil, err
}
pConfig.AdvancedVisibilityStore = "pinot-visibility"
pinotBroker := options.PinotConfig.Broker
pinotRawClient, err := pinot.NewFromBrokerList([]string{pinotBroker})
if err != nil || pinotRawClient == nil {
return nil, err
}
pinotClient = pnt.NewPinotClient(pinotRawClient, logger, options.PinotConfig)
}
scope := tally.NewTestScope("integration-test", nil)
metricsClient := metrics.NewClient(scope, metrics.ServiceIdx(0))
domainReplicationQueue := domain.NewReplicationQueue(
testBase.DomainReplicationQueueMgr,
options.ClusterGroupMetadata.CurrentClusterName,
metricsClient,
logger,
)
aConfig := noopAuthorizationConfig()
cadenceParams := &CadenceParams{
ClusterMetadata: params.ClusterMetadata,
PersistenceConfig: pConfig,
MessagingClient: messagingClient,
DomainManager: testBase.DomainManager,
HistoryV2Mgr: testBase.HistoryV2Mgr,
ExecutionMgrFactory: testBase.ExecutionMgrFactory,
DomainReplicationQueue: domainReplicationQueue,
Logger: logger,
ClusterNo: options.ClusterNo,
ESConfig: options.ESConfig,
ESClient: esClient,
ArchiverMetadata: archiverBase.metadata,
ArchiverProvider: archiverBase.provider,
HistoryConfig: options.HistoryConfig,
WorkerConfig: options.WorkerConfig,
MockAdminClient: options.MockAdminClient,
DomainReplicationTaskExecutor: domain.NewReplicationTaskExecutor(testBase.DomainManager, clock.NewRealTimeSource(), logger),
AuthorizationConfig: aConfig,
PinotConfig: options.PinotConfig,
PinotClient: pinotClient,
}
cluster := NewCadence(cadenceParams)
if err := cluster.Start(); err != nil {
return nil, err
}
return &TestCluster{testBase: testBase, archiverBase: archiverBase, host: cluster}, nil
}
func noopAuthorizationConfig() config.Authorization {
return config.Authorization{
OAuthAuthorizer: config.OAuthAuthorizer{
Enable: false,
},
NoopAuthorizer: config.NoopAuthorizer{
Enable: true,
},
}
}
// NewClusterMetadata returns cluster metdata from config
func NewClusterMetadata(t *testing.T, options *TestClusterConfig) cluster.Metadata {
clusterMetadata := cluster.GetTestClusterMetadata(options.IsPrimaryCluster)
if !options.IsPrimaryCluster && options.ClusterGroupMetadata.PrimaryClusterName != "" { // xdc cluster metadata setup
clusterMetadata = cluster.NewMetadata(
options.ClusterGroupMetadata.FailoverVersionIncrement,
options.ClusterGroupMetadata.PrimaryClusterName,
options.ClusterGroupMetadata.CurrentClusterName,
options.ClusterGroupMetadata.ClusterGroup,
func(domain string) bool { return false },
metrics.NewNoopMetricsClient(),
testlogger.New(t),
)
}
return clusterMetadata
}
func NewPersistenceTestCluster(t *testing.T, clusterConfig *TestClusterConfig) testcluster.PersistenceTestCluster {
// NOTE: Override here to keep consistent. clusterConfig will be used in the test for some purposes.
clusterConfig.Persistence.StoreType = TestFlags.PersistenceType
clusterConfig.Persistence.DBPluginName = TestFlags.SQLPluginName
var testCluster testcluster.PersistenceTestCluster
var err error
if TestFlags.PersistenceType == config.StoreTypeCassandra {
// TODO refactor to support other NoSQL
ops := clusterConfig.Persistence
ops.DBPluginName = "cassandra"
testflags.RequireCassandra(t)
testCluster = nosql.NewTestCluster(t, nosql.TestClusterParams{
PluginName: ops.DBPluginName,
KeySpace: ops.DBName,
Username: ops.DBUsername,
Password: ops.DBPassword,
Host: ops.DBHost,
Port: ops.DBPort,
ProtoVersion: ops.ProtoVersion,
SchemaBaseDir: "",
})
} else if TestFlags.PersistenceType == config.StoreTypeSQL {
var ops *persistencetests.TestBaseOptions
switch TestFlags.SQLPluginName {
case mysql.PluginName:
testflags.RequireMySQL(t)
ops, err = mysql.GetTestClusterOption()
case postgres.PluginName:
testflags.RequirePostgres(t)
ops, err = postgres.GetTestClusterOption()
default:
t.Fatal("not supported plugin " + TestFlags.SQLPluginName)
}
if err != nil {
t.Fatal(err)
}
testCluster, err = sql.NewTestCluster(TestFlags.SQLPluginName, clusterConfig.Persistence.DBName, ops.DBUsername, ops.DBPassword, ops.DBHost, ops.DBPort, ops.SchemaDir)
if err != nil {
t.Fatal(err)
}
} else {
t.Fatal("not supported storage type" + TestFlags.PersistenceType)
}
return testCluster
}
func setupShards(testBase *persistencetests.TestBase, numHistoryShards int, logger log.Logger) {
// shard 0 is always created, we create additional shards if needed
for shardID := 1; shardID < numHistoryShards; shardID++ {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestPersistenceTimeout)
err := testBase.CreateShard(ctx, shardID, "", 0)
if err != nil {
cancel()
logger.Fatal("Failed to create shard", tag.Error(err))
}
cancel()
}
}
func newArchiverBase(enabled bool, logger log.Logger) *ArchiverBase {
dcCollection := dynamicconfig.NewNopCollection()
if !enabled {
return &ArchiverBase{
metadata: archiver.NewArchivalMetadata(dcCollection, "", false, "", false, &config.ArchivalDomainDefaults{}),
provider: provider.NewArchiverProvider(nil, nil),
}
}
historyStoreDirectory, err := ioutil.TempDir("", "test-history-archival")
if err != nil {
logger.Fatal("Failed to create temp dir for history archival", tag.Error(err))
}
visibilityStoreDirectory, err := ioutil.TempDir("", "test-visibility-archival")
if err != nil {
logger.Fatal("Failed to create temp dir for visibility archival", tag.Error(err))
}
cfg := &config.FilestoreArchiver{
FileMode: "0666",
DirMode: "0766",
}
node, err := config.ToYamlNode(cfg)
if err != nil {
logger.Fatal("Should be impossible: failed to convert filestore archiver config to a yaml node")
}
archiverProvider := provider.NewArchiverProvider(
config.HistoryArchiverProvider{config.FilestoreConfig: node},
config.VisibilityArchiverProvider{config.FilestoreConfig: node},
)
return &ArchiverBase{
metadata: archiver.NewArchivalMetadata(dcCollection, "enabled", true, "enabled", true, &config.ArchivalDomainDefaults{
History: config.HistoryArchivalDomainDefaults{
Status: "enabled",
URI: "testScheme://test/history/archive/path",
},
Visibility: config.VisibilityArchivalDomainDefaults{
Status: "enabled",
URI: "testScheme://test/visibility/archive/path",
},
}),
provider: archiverProvider,
historyStoreDirectory: historyStoreDirectory,
visibilityStoreDirectory: visibilityStoreDirectory,
historyURI: filestore.URIScheme + "://" + historyStoreDirectory,
visibilityURI: filestore.URIScheme + "://" + visibilityStoreDirectory,
}
}
func getMessagingClient(config *MessagingClientConfig, logger log.Logger) messaging.Client {
if config == nil || config.UseMock {
return mocks.NewMockMessagingClient(&mocks.KafkaProducer{}, nil)
}
checkApp := len(config.KafkaConfig.Applications) != 0
return kafka.NewKafkaClient(config.KafkaConfig, metrics.NewNoopMetricsClient(), logger, tally.NoopScope, checkApp)
}
// TearDownCluster tears down the test cluster
func (tc *TestCluster) TearDownCluster() {
tc.host.Stop()
tc.host = nil
tc.testBase.TearDownWorkflowStore()
os.RemoveAll(tc.archiverBase.historyStoreDirectory)
os.RemoveAll(tc.archiverBase.visibilityStoreDirectory)
}
// GetFrontendClient returns a frontend client from the test cluster
func (tc *TestCluster) GetFrontendClient() FrontendClient {
return tc.host.GetFrontendClient()
}
// GetAdminClient returns an admin client from the test cluster
func (tc *TestCluster) GetAdminClient() AdminClient {
return tc.host.GetAdminClient()
}
// GetHistoryClient returns a history client from the test cluster
func (tc *TestCluster) GetHistoryClient() HistoryClient {
return tc.host.GetHistoryClient()
}
// GetExecutionManagerFactory returns an execution manager factory from the test cluster
func (tc *TestCluster) GetExecutionManagerFactory() persistence.ExecutionManagerFactory {
return tc.host.GetExecutionManagerFactory()
}