tools/cassandra/handler.go (183 lines of code) (raw):
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package cassandra
import (
"fmt"
"log"
"github.com/urfave/cli"
"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/persistence/nosql/nosqlplugin/cassandra/gocql"
"github.com/uber/cadence/schema/cassandra"
"github.com/uber/cadence/tools/common/schema"
)
const defaultNumReplicas = 1
// SetupSchemaConfig contains the configuration params needed to setup schema tables
type SetupSchemaConfig struct {
CQLClientConfig
schema.SetupConfig
}
// VerifyCompatibleVersion ensures that the installed version of cadence and visibility keyspaces
// is greater than or equal to the expected version.
// In most cases, the versions should match. However if after a schema upgrade there is a code
// rollback, the code version (expected version) would fall lower than the actual version in
// cassandra.
func VerifyCompatibleVersion(
cfg config.Persistence, expectedConsistency gocql.Consistency,
) error {
if ds, ok := cfg.DataStores[cfg.DefaultStore]; ok {
if err := verifyCompatibleVersion(ds, cassandra.Version, expectedConsistency); err != nil {
return err
}
}
if ds, ok := cfg.DataStores[cfg.VisibilityStore]; ok {
if err := verifyCompatibleVersion(ds, cassandra.VisibilityVersion, expectedConsistency); err != nil {
return err
}
}
return nil
}
func verifyCompatibleVersion(
ds config.DataStore,
expectedCassandraVersion string, expectedConsistency gocql.Consistency,
) error {
if ds.NoSQL != nil {
return verifyPluginVersion(ds.NoSQL, expectedCassandraVersion, expectedConsistency)
}
if ds.ShardedNoSQL != nil {
for shardName, connection := range ds.ShardedNoSQL.Connections {
err := verifyPluginVersion(connection.NoSQLPlugin, expectedCassandraVersion, expectedConsistency)
if err != nil {
return fmt.Errorf("Failed to verify version for DB shard: %v. Error: %v", shardName, err.Error())
}
}
}
// not using nosql
return nil
}
func verifyPluginVersion(plugin *config.NoSQL, expectedCassandraVersion string, expectedConsistency gocql.Consistency) error {
// Use hardcoded instead of constant because of cycle dependency issue.
// However, this file will be refactor to support NoSQL soon. After the refactoring, cycle dependency issue
// should be gone and we can use constant at that time
if plugin.PluginName != "cassandra" {
return fmt.Errorf("unknown NoSQL plugin name: %q", plugin.PluginName)
}
return CheckCompatibleVersion(*plugin, expectedCassandraVersion, expectedConsistency)
}
// CheckCompatibleVersion check the version compatibility
func CheckCompatibleVersion(
cfg config.Cassandra,
expectedVersion string,
expectedConsistency gocql.Consistency,
) error {
client, err := NewCQLClient(&CQLClientConfig{
Hosts: cfg.Hosts,
Port: cfg.Port,
User: cfg.User,
Password: cfg.Password,
Keyspace: cfg.Keyspace,
AllowedAuthenticators: cfg.AllowedAuthenticators,
Timeout: DefaultTimeout,
ConnectTimeout: DefaultConnectTimeout,
TLS: cfg.TLS,
ProtoVersion: cfg.ProtoVersion,
}, expectedConsistency)
if err != nil {
return fmt.Errorf("creating CQL client: %w", err)
}
defer client.Close()
return schema.VerifyCompatibleVersion(client, cfg.Keyspace, expectedVersion)
}
// setupSchema executes the setupSchemaTask
// using the given command line arguments
// as input
func setupSchema(cli *cli.Context) error {
config, err := newCQLClientConfig(cli)
if err != nil {
return handleErr(schema.NewConfigError(err.Error()))
}
client, err := NewCQLClient(config, gocql.All)
if err != nil {
return handleErr(err)
}
defer client.Close()
if err := schema.Setup(cli, client); err != nil {
return handleErr(err)
}
return nil
}
// updateSchema executes the updateSchemaTask
// using the given command lien args as input
func updateSchema(cli *cli.Context) error {
config, err := newCQLClientConfig(cli)
if err != nil {
return handleErr(schema.NewConfigError(err.Error()))
}
client, err := NewCQLClient(config, gocql.All)
if err != nil {
return handleErr(err)
}
defer client.Close()
if err := schema.Update(cli, client); err != nil {
return handleErr(err)
}
return nil
}
// createKeyspace creates a cassandra Keyspace
func createKeyspace(cli *cli.Context) error {
config, err := newCQLClientConfig(cli)
if err != nil {
return handleErr(schema.NewConfigError(err.Error()))
}
keyspace := cli.String(schema.CLIOptKeyspace)
if keyspace == "" {
return handleErr(schema.NewConfigError("missing " + flag(schema.CLIOptKeyspace) + " argument "))
}
datacenter := cli.String(schema.CLIOptDatacenter)
err = doCreateKeyspace(*config, keyspace, datacenter)
if err != nil {
return handleErr(fmt.Errorf("error creating Keyspace:%v", err))
}
return nil
}
func doCreateKeyspace(cfg CQLClientConfig, name string, datacenter string) error {
cfg.Keyspace = SystemKeyspace
client, err := NewCQLClient(&cfg, gocql.All)
if err != nil {
return err
}
defer client.Close()
if datacenter != "" {
return client.CreateNTSKeyspace(name, datacenter)
}
return client.CreateKeyspace(name)
}
func newCQLClientConfig(cli *cli.Context) (*CQLClientConfig, error) {
cqlConfig := new(CQLClientConfig)
cqlConfig.Hosts = cli.GlobalString(schema.CLIOptEndpoint)
cqlConfig.Port = cli.GlobalInt(schema.CLIOptPort)
cqlConfig.User = cli.GlobalString(schema.CLIOptUser)
cqlConfig.Password = cli.GlobalString(schema.CLIOptPassword)
cqlConfig.AllowedAuthenticators = cli.GlobalStringSlice(schema.CLIOptAllowedAuthenticators)
cqlConfig.Timeout = cli.GlobalInt(schema.CLIOptTimeout)
cqlConfig.ConnectTimeout = cli.GlobalInt(schema.CLIOptConnectTimeout)
cqlConfig.Keyspace = cli.GlobalString(schema.CLIOptKeyspace)
cqlConfig.NumReplicas = cli.Int(schema.CLIOptReplicationFactor)
cqlConfig.ProtoVersion = cli.Int(schema.CLIOptProtoVersion)
if cli.GlobalBool(schema.CLIFlagEnableTLS) {
cqlConfig.TLS = &config.TLS{
Enabled: true,
CertFile: cli.GlobalString(schema.CLIFlagTLSCertFile),
KeyFile: cli.GlobalString(schema.CLIFlagTLSKeyFile),
CaFile: cli.GlobalString(schema.CLIFlagTLSCaFile),
EnableHostVerification: cli.GlobalBool(schema.CLIFlagTLSEnableHostVerification),
ServerName: cli.GlobalString(schema.CLIFlagTLSServerName),
}
}
if err := validateCQLClientConfig(cqlConfig); err != nil {
return nil, err
}
return cqlConfig, nil
}
func validateCQLClientConfig(config *CQLClientConfig) error {
if len(config.Hosts) == 0 {
return schema.NewConfigError("missing cassandra endpoint argument " + flag(schema.CLIOptEndpoint))
}
if config.Keyspace == "" {
return schema.NewConfigError("missing " + flag(schema.CLIOptKeyspace) + " argument ")
}
if config.Port == 0 {
config.Port = DefaultCassandraPort
}
if config.NumReplicas == 0 {
config.NumReplicas = defaultNumReplicas
}
return nil
}
func flag(opt string) string {
return "(-" + opt + ")"
}
func handleErr(err error) error {
log.Println(err)
return err
}