host/integrationbase.go (256 lines of code) (raw):

// Copyright (c) 2016 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package host import ( "context" "fmt" "io/ioutil" "os" "testing" "time" "github.com/pborman/uuid" "github.com/stretchr/testify/suite" "go.uber.org/yarpc" "go.uber.org/yarpc/transport/tchannel" "gopkg.in/yaml.v2" "github.com/uber/cadence/common" "github.com/uber/cadence/common/cache" "github.com/uber/cadence/common/dynamicconfig" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/log/testlogger" "github.com/uber/cadence/common/persistence" pt "github.com/uber/cadence/common/persistence/persistence-tests" "github.com/uber/cadence/common/persistence/persistence-tests/testcluster" "github.com/uber/cadence/common/types" "github.com/uber/cadence/environment" ) type ( // IntegrationBase is a base struct for integration tests IntegrationBase struct { suite.Suite testCluster *TestCluster testClusterConfig *TestClusterConfig engine FrontendClient adminClient AdminClient Logger log.Logger domainName string testRawHistoryDomainName string foreignDomainName string archivalDomainName string defaultTestCluster testcluster.PersistenceTestCluster visibilityTestCluster testcluster.PersistenceTestCluster } IntegrationBaseParams struct { T *testing.T DefaultTestCluster testcluster.PersistenceTestCluster VisibilityTestCluster testcluster.PersistenceTestCluster TestClusterConfig *TestClusterConfig } ) func NewIntegrationBase(params IntegrationBaseParams) *IntegrationBase { return &IntegrationBase{ defaultTestCluster: params.DefaultTestCluster, visibilityTestCluster: params.VisibilityTestCluster, testClusterConfig: params.TestClusterConfig, } } func (s *IntegrationBase) setupSuite() { s.setupLogger() if s.testClusterConfig.FrontendAddress != "" { s.Logger.Info("Running integration test against specified frontend", tag.Address(TestFlags.FrontendAddr)) channel, err := tchannel.NewChannelTransport(tchannel.ServiceName("cadence-frontend")) s.Require().NoError(err) dispatcher := yarpc.NewDispatcher(yarpc.Config{ Name: "unittest", Outbounds: yarpc.Outbounds{ "cadence-frontend": {Unary: channel.NewSingleOutbound(TestFlags.FrontendAddr)}, }, InboundMiddleware: yarpc.InboundMiddleware{ Unary: &versionMiddleware{}, }, }) if err := dispatcher.Start(); err != nil { s.Logger.Fatal("Failed to create outbound transport channel", tag.Error(err)) } s.engine = NewFrontendClient(dispatcher) s.adminClient = NewAdminClient(dispatcher) } else { s.Logger.Info("Running integration test against test cluster") clusterMetadata := NewClusterMetadata(s.T(), s.testClusterConfig) dc := persistence.DynamicConfiguration{ EnableSQLAsyncTransaction: dynamicconfig.GetBoolPropertyFn(false), EnableCassandraAllConsistencyLevelDelete: dynamicconfig.GetBoolPropertyFn(true), PersistenceSampleLoggingRate: dynamicconfig.GetIntPropertyFn(100), EnableShardIDMetrics: dynamicconfig.GetBoolPropertyFn(true), } params := pt.TestBaseParams{ DefaultTestCluster: s.defaultTestCluster, VisibilityTestCluster: s.visibilityTestCluster, ClusterMetadata: clusterMetadata, DynamicConfiguration: dc, } cluster, err := NewCluster(s.T(), s.testClusterConfig, s.Logger, params) s.Require().NoError(err) s.testCluster = cluster s.engine = s.testCluster.GetFrontendClient() s.adminClient = s.testCluster.GetAdminClient() } s.testRawHistoryDomainName = "TestRawHistoryDomain" s.domainName = s.randomizeStr("integration-test-domain") s.Require().NoError( s.registerDomain(s.domainName, 1, types.ArchivalStatusDisabled, "", types.ArchivalStatusDisabled, "")) s.Require().NoError( s.registerDomain(s.testRawHistoryDomainName, 1, types.ArchivalStatusDisabled, "", types.ArchivalStatusDisabled, "")) s.foreignDomainName = s.randomizeStr("integration-foreign-test-domain") s.Require().NoError( s.registerDomain(s.foreignDomainName, 1, types.ArchivalStatusDisabled, "", types.ArchivalStatusDisabled, "")) s.Require().NoError(s.registerArchivalDomain()) // this sleep is necessary because domainv2 cache gets refreshed in the // background only every domainCacheRefreshInterval period time.Sleep(cache.DomainCacheRefreshInterval + time.Second) } func (s *IntegrationBase) setupLogger() { s.Logger = testlogger.New(s.T()) } // GetTestClusterConfig return test cluster config func GetTestClusterConfig(configFile string) (*TestClusterConfig, error) { if err := environment.SetupEnv(); err != nil { return nil, err } configLocation := configFile if TestFlags.TestClusterConfigFile != "" { configLocation = TestFlags.TestClusterConfigFile } // This is just reading a config so it's less of a security concern // #nosec confContent, err := ioutil.ReadFile(configLocation) if err != nil { return nil, fmt.Errorf("failed to read test cluster config file %v: %v", configLocation, err) } confContent = []byte(os.ExpandEnv(string(confContent))) var options TestClusterConfig if err := yaml.Unmarshal(confContent, &options); err != nil { return nil, fmt.Errorf("failed to decode test cluster config %v", tag.Error(err)) } options.FrontendAddress = TestFlags.FrontendAddr if options.ESConfig != nil { options.ESConfig.Indices[common.VisibilityAppName] += uuid.New() } if options.Persistence.DBName == "" { options.Persistence.DBName = "test_" + pt.GenerateRandomDBName(10) } return &options, nil } // GetTestClusterConfigs return test cluster configs func GetTestClusterConfigs(configFile string) ([]*TestClusterConfig, error) { if err := environment.SetupEnv(); err != nil { return nil, err } fileName := configFile if TestFlags.TestClusterConfigFile != "" { fileName = TestFlags.TestClusterConfigFile } confContent, err := ioutil.ReadFile(fileName) if err != nil { return nil, fmt.Errorf("failed to read test cluster config file %v: %v", fileName, err) } confContent = []byte(os.ExpandEnv(string(confContent))) var clusterConfigs []*TestClusterConfig if err := yaml.Unmarshal(confContent, &clusterConfigs); err != nil { return nil, fmt.Errorf("failed to decode test cluster config %v", tag.Error(err)) } return clusterConfigs, nil } func (s *IntegrationBase) tearDownSuite() { if s.testCluster != nil { s.testCluster.TearDownCluster() s.testCluster = nil s.engine = nil s.adminClient = nil } } func (s *IntegrationBase) registerDomain( domain string, retentionDays int, historyArchivalStatus types.ArchivalStatus, historyArchivalURI string, visibilityArchivalStatus types.ArchivalStatus, visibilityArchivalURI string, ) error { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() return s.engine.RegisterDomain(ctx, &types.RegisterDomainRequest{ Name: domain, Description: domain, WorkflowExecutionRetentionPeriodInDays: int32(retentionDays), HistoryArchivalStatus: &historyArchivalStatus, HistoryArchivalURI: historyArchivalURI, VisibilityArchivalStatus: &visibilityArchivalStatus, VisibilityArchivalURI: visibilityArchivalURI, }) } func (s *IntegrationBase) domainCacheRefresh() { s.testClusterConfig.TimeSource.Advance(cache.DomainCacheRefreshInterval + time.Second) // this sleep is necessary to yield execution to other goroutines. not 100% guaranteed to work time.Sleep(2 * time.Second) } func (s *IntegrationBase) randomizeStr(id string) string { return fmt.Sprintf("%v-%v", id, uuid.New()) } func (s *IntegrationBase) printWorkflowHistory(domain string, execution *types.WorkflowExecution) { events := s.getHistory(domain, execution) history := &types.History{} history.Events = events common.PrettyPrintHistory(history, s.Logger) } func (s *IntegrationBase) getHistory(domain string, execution *types.WorkflowExecution) []*types.HistoryEvent { ctx, cancel := createContext() defer cancel() historyResponse, err := s.engine.GetWorkflowExecutionHistory(ctx, &types.GetWorkflowExecutionHistoryRequest{ Domain: domain, Execution: execution, MaximumPageSize: 5, // Use small page size to force pagination code path }) s.Require().NoError(err) events := historyResponse.History.Events for historyResponse.NextPageToken != nil { ctx, cancel := createContext() historyResponse, err = s.engine.GetWorkflowExecutionHistory(ctx, &types.GetWorkflowExecutionHistoryRequest{ Domain: domain, Execution: execution, NextPageToken: historyResponse.NextPageToken, }) cancel() s.Require().NoError(err) events = append(events, historyResponse.History.Events...) } return events } // To register archival domain we can't use frontend API as the retention period is set to 0 for testing, // and request will be rejected by frontend. Here we make a call directly to persistence to register // the domain. func (s *IntegrationBase) registerArchivalDomain() error { ctx, cancel := context.WithTimeout(context.Background(), defaultTestPersistenceTimeout) defer cancel() s.archivalDomainName = s.randomizeStr("integration-archival-enabled-domain") currentClusterName := s.testCluster.testBase.ClusterMetadata.GetCurrentClusterName() domainRequest := &persistence.CreateDomainRequest{ Info: &persistence.DomainInfo{ ID: uuid.New(), Name: s.archivalDomainName, Status: persistence.DomainStatusRegistered, }, Config: &persistence.DomainConfig{ Retention: 0, HistoryArchivalStatus: types.ArchivalStatusEnabled, HistoryArchivalURI: s.testCluster.archiverBase.historyURI, VisibilityArchivalStatus: types.ArchivalStatusEnabled, VisibilityArchivalURI: s.testCluster.archiverBase.visibilityURI, BadBinaries: types.BadBinaries{Binaries: map[string]*types.BadBinaryInfo{}}, }, ReplicationConfig: &persistence.DomainReplicationConfig{ ActiveClusterName: currentClusterName, Clusters: []*persistence.ClusterReplicationConfig{ {ClusterName: currentClusterName}, }, }, IsGlobalDomain: false, FailoverVersion: common.EmptyVersion, } response, err := s.testCluster.testBase.DomainManager.CreateDomain(ctx, domainRequest) if err == nil { s.Logger.Info("Register domain succeeded", tag.WorkflowDomainName(s.archivalDomainName), tag.WorkflowDomainID(response.ID), ) } return err }