tools/cassandra/cqlclient.go (223 lines of code) (raw):
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package cassandra
import (
"fmt"
"log"
"time"
"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/persistence/nosql/nosqlplugin/cassandra/gocql"
"github.com/uber/cadence/tools/common/schema"
)
type (
CqlClient interface {
CreateDatabase(name string) error
DropDatabase(name string) error
CreateKeyspace(name string) error
CreateNTSKeyspace(name string, datacenter string) error
DropKeyspace(name string) error
DropAllTables() error
CreateSchemaVersionTables() error
ReadSchemaVersion() (string, error)
UpdateSchemaVersion(newVersion string, minCompatibleVersion string) error
WriteSchemaUpdateLog(oldVersion string, newVersion string, manifestMD5 string, desc string) error
ExecDDLQuery(stmt string, args ...interface{}) error
Close()
ListTables() ([]string, error)
ListTypes() ([]string, error)
DropTable(name string) error
DropType(name string) error
DropAllTablesTypes() error
}
CqlClientImpl struct {
nReplicas int
session gocql.Session
cfg *CQLClientConfig
}
// CQLClientConfig contains the configuration for cql client
CQLClientConfig struct {
Hosts string
Port int
User string
Password string
AllowedAuthenticators []string
Keyspace string
Timeout int
ConnectTimeout int
NumReplicas int
ProtoVersion int
TLS *config.TLS
}
)
const (
DefaultTimeout = 30 // Timeout in seconds
DefaultConnectTimeout = 2 // Connect timeout in seconds
DefaultCassandraPort = 9042
SystemKeyspace = "system"
)
const (
readSchemaVersionCQL = `SELECT curr_version from schema_version where keyspace_name=?`
listTablesCQL = `SELECT table_name from system_schema.tables where keyspace_name=?`
listTypesCQL = `SELECT type_name from system_schema.types where keyspace_name=?`
writeSchemaVersionCQL = `INSERT into schema_version(keyspace_name, creation_time, curr_version, min_compatible_version) VALUES (?,?,?,?)`
writeSchemaUpdateHistoryCQL = `INSERT into schema_update_history(year, month, update_time, old_version, new_version, manifest_md5, description) VALUES(?,?,?,?,?,?,?)`
createSchemaVersionTableCQL = `CREATE TABLE schema_version(keyspace_name text PRIMARY KEY, ` +
`creation_time timestamp, ` +
`curr_version text, ` +
`min_compatible_version text);`
createSchemaUpdateHistoryTableCQL = `CREATE TABLE schema_update_history(` +
`year int, ` +
`month int, ` +
`update_time timestamp, ` +
`description text, ` +
`manifest_md5 text, ` +
`new_version text, ` +
`old_version text, ` +
`PRIMARY KEY ((year, month), update_time));`
createKeyspaceCQL = `CREATE KEYSPACE IF NOT EXISTS %v ` +
`WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor' : %v};`
createNTSKeyspaceCQL = `CREATE KEYSPACE IF NOT EXISTS %v ` +
`WITH replication = { 'class' : 'NetworkTopologyStrategy', '%v' : %v};`
)
var _ schema.SchemaClient = (*CqlClientImpl)(nil)
// NewCQLClient returns a new instance of CQLClient
func NewCQLClient(cfg *CQLClientConfig, expectedConsistency gocql.Consistency) (CqlClient, error) {
var err error
cqlClient := new(CqlClientImpl)
cqlClient.cfg = cfg
cqlClient.nReplicas = cfg.NumReplicas
cqlClient.session, err = gocql.GetRegisteredClient().CreateSession(gocql.ClusterConfig{
Hosts: cfg.Hosts,
Port: cfg.Port,
User: cfg.User,
Password: cfg.Password,
AllowedAuthenticators: cfg.AllowedAuthenticators,
Keyspace: cfg.Keyspace,
TLS: cfg.TLS,
Timeout: time.Duration(cfg.Timeout) * time.Second,
ConnectTimeout: time.Duration(cfg.ConnectTimeout) * time.Second,
ProtoVersion: cfg.ProtoVersion,
Consistency: expectedConsistency,
})
if err != nil {
return nil, err
}
return cqlClient, nil
}
func (client *CqlClientImpl) CreateDatabase(name string) error {
return client.CreateKeyspace(name)
}
func (client *CqlClientImpl) DropDatabase(name string) error {
return client.DropKeyspace(name)
}
// CreateNTSKeyspace creates a cassandra Keyspace if it doesn't exist using network topology strategy
func (client *CqlClientImpl) CreateKeyspace(name string) error {
return client.ExecDDLQuery(fmt.Sprintf(createKeyspaceCQL, name, client.nReplicas))
}
// CreateNTSKeyspace creates a cassandra Keyspace if it doesn't exist using network topology strategy
func (client *CqlClientImpl) CreateNTSKeyspace(name string, datacenter string) error {
return client.ExecDDLQuery(fmt.Sprintf(createNTSKeyspaceCQL, name, datacenter, client.nReplicas))
}
// DropKeyspace drops a Keyspace
func (client *CqlClientImpl) DropKeyspace(name string) error {
return client.ExecDDLQuery(fmt.Sprintf("DROP KEYSPACE %v", name))
}
func (client *CqlClientImpl) DropAllTables() error {
return client.DropAllTablesTypes()
}
// CreateSchemaVersionTables sets up the schema version tables
func (client *CqlClientImpl) CreateSchemaVersionTables() error {
if err := client.ExecDDLQuery(createSchemaVersionTableCQL); err != nil {
return err
}
return client.ExecDDLQuery(createSchemaUpdateHistoryTableCQL)
}
// ReadSchemaVersion returns the current schema version for the Keyspace
func (client *CqlClientImpl) ReadSchemaVersion() (string, error) {
query := client.session.Query(readSchemaVersionCQL, client.cfg.Keyspace)
iter := query.Iter()
var version string
if !iter.Scan(&version) {
err := iter.Close()
return "", fmt.Errorf("reading schema version: %w", err)
}
if err := iter.Close(); err != nil {
return "", err
}
return version, nil
}
// UpdateSchemaVersion updates the schema version for the Keyspace
func (client *CqlClientImpl) UpdateSchemaVersion(newVersion string, minCompatibleVersion string) error {
query := client.session.Query(writeSchemaVersionCQL, client.cfg.Keyspace, time.Now(), newVersion, minCompatibleVersion)
return query.Exec()
}
// WriteSchemaUpdateLog adds an entry to the schema update history table
func (client *CqlClientImpl) WriteSchemaUpdateLog(oldVersion string, newVersion string, manifestMD5 string, desc string) error {
now := time.Now().UTC()
query := client.session.Query(writeSchemaUpdateHistoryCQL)
query.Bind(now.Year(), int(now.Month()), now, oldVersion, newVersion, manifestMD5, desc)
return query.Exec()
}
// ExecDDLQuery executes a cql statement
func (client *CqlClientImpl) ExecDDLQuery(stmt string, args ...interface{}) error {
return client.session.Query(stmt, args...).Exec()
}
// Close closes the cql client
func (client *CqlClientImpl) Close() {
if client.session != nil {
client.session.Close()
}
}
// ListTables lists the table names in a Keyspace
func (client *CqlClientImpl) ListTables() ([]string, error) {
query := client.session.Query(listTablesCQL, client.cfg.Keyspace)
iter := query.Iter()
var names []string
var name string
for iter.Scan(&name) {
names = append(names, name)
}
if err := iter.Close(); err != nil {
return nil, err
}
return names, nil
}
// ListTypes lists the User defined types in a Keyspace.
func (client *CqlClientImpl) ListTypes() ([]string, error) {
qry := client.session.Query(listTypesCQL, client.cfg.Keyspace)
iter := qry.Iter()
var names []string
var name string
for iter.Scan(&name) {
names = append(names, name)
}
if err := iter.Close(); err != nil {
return nil, err
}
return names, nil
}
// DropTable drops a given table from the Keyspace
func (client *CqlClientImpl) DropTable(name string) error {
return client.ExecDDLQuery(fmt.Sprintf("DROP TABLE %v", name))
}
// DropType drops a given type from the Keyspace
func (client *CqlClientImpl) DropType(name string) error {
return client.ExecDDLQuery(fmt.Sprintf("DROP TYPE %v", name))
}
// DropAllTablesTypes deletes all tables/types in the
// Keyspace without deleting the Keyspace
func (client *CqlClientImpl) DropAllTablesTypes() error {
tables, err := client.ListTables()
if err != nil {
return err
}
log.Printf("Dropping following tables: %v\n", tables)
for _, table := range tables {
err1 := client.DropTable(table)
if err1 != nil {
log.Printf("Error dropping table %v, err=%v\n", table, err1)
}
}
types, err := client.ListTypes()
if err != nil {
return err
}
log.Printf("Dropping following types: %v\n", types)
numOfTypes := len(types)
for i := 0; i < numOfTypes && len(types) > 0; i++ {
var erroredTypes []string
for _, t := range types {
err = client.DropType(t)
if err != nil {
log.Printf("Error dropping type %v, err=%v\n", t, err)
erroredTypes = append(erroredTypes, t)
}
}
types = erroredTypes
}
if len(types) > 0 {
return err
}
return nil
}