pkg/storage/connectors/cassandra/cassandra.go (476 lines of code) (raw):
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cassandra
import (
"context"
"reflect"
"time"
"github.com/uber/peloton/pkg/storage/objects/base"
"github.com/uber/peloton/pkg/storage/orm"
"github.com/gocql/gocql"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/uber-go/tally"
"go.uber.org/yarpc/yarpcerrors"
)
const (
_defaultRetryTimeout = 50 * time.Millisecond
_defaultRetryAttempts = 5
useCasWrite = true
)
const (
// operation tags for metrics
create = "create"
cas = "cas"
get = "get"
getAll = "get_all"
getIter = "get_iter"
update = "update"
del = "delete"
// default limit for select statements.
_defaultQueryLimit = 1
_ignoredQueryLimit = 0
)
type cassandraConnector struct {
// implements orm.Connector interface
orm.Connector
// Session is the gocql session created for this connector
Session *gocql.Session
// scope is the storage scope for metrics
scope tally.Scope
// scope is the storage scope for success metrics
executeSuccessScope tally.Scope
// scope is the storage scope for failure metrics
executeFailScope tally.Scope
// Conf is the Cassandra connector config for this cluster
Conf *Config
}
// NewCassandraConnector initializes a Cassandra Connector
func NewCassandraConnector(
config *Config,
scope tally.Scope,
) (orm.Connector, error) {
session, err := CreateStoreSession(
config.CassandraConn, config.StoreName)
if err != nil {
return nil, err
}
// create a storeScope for the keyspace StoreName
storeScope := scope.SubScope("cql").Tagged(
map[string]string{"store": config.StoreName})
return &cassandraConnector{
Session: session,
scope: storeScope,
executeSuccessScope: storeScope.Tagged(
map[string]string{"result": "success"}),
executeFailScope: storeScope.Tagged(
map[string]string{"result": "fail"}),
Conf: config,
}, nil
}
// ensure that implementation (cassandraConnector) satisfies the interface
var _ orm.Connector = (*cassandraConnector)(nil)
// getGocqlErrorTag gets a error tag for metrics based on gocql error
// We cannot just use err.Error() as a tag because it contains invalid
// characters like = : etc. which will be rejected by M3
func getGocqlErrorTag(err error) string {
if yarpcerrors.IsAlreadyExists(err) {
return "already_exists"
}
if yarpcerrors.IsNotFound(err) {
return "not_found"
}
switch err.(type) {
case *gocql.RequestErrReadFailure:
return "read_failure"
case *gocql.RequestErrWriteFailure:
return "write_failure"
case *gocql.RequestErrAlreadyExists:
return "already_exists"
case *gocql.RequestErrReadTimeout:
return "read_timeout"
case *gocql.RequestErrWriteTimeout:
return "write_timeout"
case *gocql.RequestErrUnavailable:
return "unavailable"
case *gocql.RequestErrFunctionFailure:
return "function_failure"
case *gocql.RequestErrUnprepared:
return "unprepared"
default:
return "unknown"
}
}
// buildResultRow is used to allocate memory for the row to be populated by
// Cassandra read operation based on what object fields are being read
func buildResultRow(e *base.Definition, columns []string) []interface{} {
results := make([]interface{}, len(columns))
timeType := reflect.ValueOf(time.Now())
gocqlUUIDType := reflect.ValueOf(gocql.UUIDFromTime(time.Now()))
for i, column := range columns {
// get the type of the field from the ColumnToType mapping for object
// That we we can allocate appropriate memory for this field
typ := e.ColumnToType[column]
switch typ.Kind() {
case reflect.String:
var value *string
results[i] = &value
case reflect.Int32, reflect.Uint32, reflect.Int:
// C* internally uses int and int64
var value *int
results[i] = &value
case reflect.Int64, reflect.Uint64:
// C* internally uses int and int64
var value *int64
results[i] = &value
case reflect.Bool:
var value *bool
results[i] = &value
case reflect.Slice:
var value *[]byte
results[i] = &value
case timeType.Kind():
var value *time.Time
results[i] = &value
case gocqlUUIDType.Kind():
var value *gocql.UUID
results[i] = &value
case reflect.Ptr:
// Special case for custom optional string type:
// string type used in Cassandra
// converted to/from custom type in ORM layer
if typ == reflect.TypeOf(&base.OptionalString{}) {
var value *string
results[i] = &value
break
}
// Special case for custom optional int type:
// int64 type used in Cassandra
// converted to/from custom type in ORM layer
if typ == reflect.TypeOf(&base.OptionalUInt64{}) {
var value *int64
results[i] = &value
break
}
// for unrecognized pointer types, fall back to default logging
fallthrough
default:
// This should only happen if we start using a new cassandra type
// without adding to the translation layer
log.WithFields(log.Fields{"type": typ.Kind(), "column": column}).
Infof("type not found")
}
}
return results
}
// getRowFromResult translates a row read from Cassandra into a list of
// base.Column to be interpreted by base store client
func getRowFromResult(
e *base.Definition, columnNames []string, columnVals []interface{},
) []base.Column {
row := make([]base.Column, 0, len(columnNames))
for i, columnName := range columnNames {
// construct a list of column objects from the lists of column names
// and values that were returned by the cassandra query
column := base.Column{
Name: columnName,
}
switch rv := columnVals[i].(type) {
case **int:
column.Value = *rv
case **int64:
column.Value = *rv
case **string:
column.Value = *rv
case **gocql.UUID:
column.Value = *rv
case **time.Time:
column.Value = *rv
case **bool:
column.Value = *rv
case **[]byte:
column.Value = *rv
default:
// This should only happen if we start using a new cassandra type
// without adding to the translation layer
log.WithFields(log.Fields{
"data": columnVals[i],
"column": columnName}).Infof("type not found")
}
row = append(row, column)
}
return row
}
// splitColumnNameValue is used to return list of column names and list of their
// corresponding value. Order is very important in this lists as they will be
// used separately when constructing the CQL query.
func splitColumnNameValue(row []base.Column) (
colNames []string, colValues []interface{}) {
// Split row into two lists of column names and column values.
// So for a location `i` in the list, the colNames[i] and colValues[i] will
// represent row[i]
for _, column := range row {
colNames = append(colNames, column.Name)
colValues = append(colValues, column.Value)
}
return colNames, colValues
}
// Create creates a new row in DB if it already doesn't exist. Uses CAS write.
func (c *cassandraConnector) CreateIfNotExists(
ctx context.Context,
e *base.Definition,
row []base.Column,
) error {
return c.create(ctx, e, row, useCasWrite)
}
// Create creates a new row in DB.
func (c *cassandraConnector) Create(
ctx context.Context,
e *base.Definition,
row []base.Column,
) error {
return c.create(ctx, e, row, !useCasWrite)
}
func (c *cassandraConnector) create(
ctx context.Context,
e *base.Definition,
row []base.Column,
casWrite bool,
) error {
// split row into a list of names and values to compose query stmt using
// names and use values in the session query call, so the order needs to be
// maintained.
colNames, colValues := splitColumnNameValue(row)
// Prepare insert statement
stmt, err := InsertStmt(
Table(e.Name),
Columns(colNames),
Values(colValues),
IfNotExist(casWrite),
)
if err != nil {
return err
}
operation := create
if casWrite {
operation = cas
}
q := c.Session.Query(stmt, colValues...).WithContext(ctx)
if casWrite {
applied, err := q.MapScanCAS(map[string]interface{}{})
if err != nil {
sendCounters(c.executeFailScope, e.Name, operation, err)
return err
}
if !applied {
return yarpcerrors.AlreadyExistsErrorf("item already exists")
}
} else {
if err := q.Exec(); err != nil {
sendCounters(c.executeFailScope, e.Name, operation, err)
return err
}
}
sendLatency(c.scope, e.Name, operation, time.Duration(q.Latency()))
sendCounters(c.executeSuccessScope, e.Name, operation, nil)
return nil
}
// buildSelectQuery builds a select query using base object and key columns.
// If limit is non-zero, it will be enforced in the select query.
// If limit is 0, the select query will fetch all rows that match.
func (c *cassandraConnector) buildSelectQuery(
ctx context.Context,
e *base.Definition,
keyCols []base.Column,
colNamesToRead []string,
limit int,
) (*gocql.Query, error) {
// split keyCols into a list of names and values to compose query stmt using
// names and use values in the session query call, so the order needs to be
// maintained.
keyColNames, keyColValues := splitColumnNameValue(keyCols)
// Prepare select statement
stmt, err := SelectStmt(
Table(e.Name),
Columns(colNamesToRead),
Conditions(keyColNames),
Limit(limit),
)
if err != nil {
return nil, err
}
return c.Session.Query(stmt, keyColValues...).WithContext(ctx), nil
}
// Get fetches a record from DB using primary keys
// returns a map describing a row from DB, key is columnName,
// value is columnValue.
func (c *cassandraConnector) Get(
ctx context.Context,
e *base.Definition,
keyCols []base.Column,
colNamesToRead ...string,
) (map[string]interface{}, error) {
var result []map[string]interface{}
if len(colNamesToRead) == 0 {
colNamesToRead = e.GetColumnsToRead()
}
q, err := c.buildSelectQuery(
ctx,
e,
keyCols,
colNamesToRead,
_defaultQueryLimit)
if err != nil {
sendCounters(c.executeFailScope, e.Name, get, err)
return nil, err
}
// execute query and get iterator
cqlIter := q.Iter()
result, err = cqlIter.SliceMap()
if err != nil {
sendCounters(c.executeFailScope, e.Name, get, err)
return nil, errors.Wrap(err, "SliceMap failed")
}
if len(result) > 1 {
log.WithField("rows len",
len(result)).Info("Get SliceMap returns more than 1 row")
sendCounters(c.executeFailScope, e.Name, get, err)
return nil, nil
}
if len(result) == 0 {
return nil, nil
}
// at this stage, we know result should be an array size of 1
c.processDBData(result)
sendLatency(c.scope, e.Name, get, time.Duration(q.Latency()))
sendCounters(c.executeSuccessScope, e.Name, get, err)
return result[0], err
}
// GetAll fetches all rows from DB using partition keys
// returns an array of map[string]interface{}
// the key of the map is the columnName, the value of the map is ColumnValue
func (c *cassandraConnector) GetAll(
ctx context.Context,
e *base.Definition,
keyCols []base.Column,
) ([]map[string]interface{}, error) {
var result []map[string]interface{}
colNamesToRead := e.GetColumnsToRead()
q, err := c.buildSelectQuery(
ctx,
e,
keyCols,
colNamesToRead,
_ignoredQueryLimit)
if err != nil {
sendCounters(c.executeFailScope, e.Name, getAll, err)
return nil, err
}
// execute query and get iterator
cqlIter := q.Iter()
defer cqlIter.Close()
result, err = cqlIter.SliceMap()
if err != nil {
sendCounters(c.executeFailScope, e.Name, getAll, err)
return nil, errors.Wrap(err, "SliceMap failed")
}
c.processDBData(result)
if err != nil {
sendCounters(c.executeFailScope, e.Name, getAll, err)
} else {
sendCounters(c.executeSuccessScope, e.Name, getAll, err)
}
return result, err
}
func (c *cassandraConnector) processDBData(
result []map[string]interface{}) {
for i, mapItem := range result {
for k, v := range mapItem {
switch v.(type) {
case gocql.UUID:
result[i][k] = v.(gocql.UUID).String()
// C* internally uses int and int64
// ORM object use uint32 or uint64
case int:
result[i][k] = uint32(v.(int))
case int64:
result[i][k] = uint64(v.(int64))
default:
continue
}
}
}
}
// GetAllIter gives an iterator to fetch all rows from DB
func (c *cassandraConnector) GetAllIter(
ctx context.Context,
e *base.Definition,
keyCols []base.Column,
) (iter orm.Iterator, err error) {
colNamesToRead := e.GetColumnsToRead()
q, err := c.buildSelectQuery(
ctx,
e,
keyCols,
colNamesToRead,
_ignoredQueryLimit)
if err != nil {
return nil, err
}
// execute query and get iterator
cqlIter := q.Iter()
sendLatency(c.scope, e.Name, getIter, time.Duration(q.Latency()))
return newIterator(
e,
colNamesToRead,
c.executeSuccessScope,
c.executeFailScope,
cqlIter,
), nil
}
// Delete deletes a record from DB using primary keys
func (c *cassandraConnector) Delete(
ctx context.Context,
e *base.Definition,
keyCols []base.Column,
) error {
// split keyCols into a list of names and values to compose query stmt using
// names and use values in the session query call, so the order needs to be
// maintained.
keyColNames, keyColValues := splitColumnNameValue(keyCols)
// Prepare delete statement
stmt, err := DeleteStmt(
Table(e.Name),
Conditions(keyColNames),
)
if err != nil {
return err
}
q := c.Session.Query(stmt, keyColValues...).WithContext(ctx)
if err := q.Exec(); err != nil {
sendCounters(c.executeFailScope, e.Name, del, err)
return err
}
sendLatency(c.scope, e.Name, del, time.Duration(q.Latency()))
sendCounters(c.executeSuccessScope, e.Name, del, nil)
return nil
}
// Update updates an existing row in DB.
func (c *cassandraConnector) Update(
ctx context.Context,
e *base.Definition,
row []base.Column,
keyCols []base.Column,
) error {
// split keyCols into a list of names and values to compose query stmt using
// names and use values in the session query call, so the order needs to be
// maintained.
keyColNames, keyColValues := splitColumnNameValue(keyCols)
// split row into a list of names and values to compose query stmt using
// names and use values in the session query call, so the order needs to be
// maintained.
colNames, colValues := splitColumnNameValue(row)
// Prepare update statement
stmt, err := UpdateStmt(
Table(e.Name),
Updates(colNames),
Conditions(keyColNames),
)
if err != nil {
return err
}
// list of values to be supplied in the query
updateVals := append(colValues, keyColValues...)
q := c.Session.Query(
stmt, updateVals...).WithContext(ctx)
if err := q.Exec(); err != nil {
sendCounters(c.executeFailScope, e.Name, update, err)
return err
}
sendLatency(c.scope, e.Name, update, time.Duration(q.Latency()))
sendCounters(c.executeSuccessScope, e.Name, update, nil)
return nil
}
// cassandraIterator implements interface Iterator for Cassandra
type cassandraIterator struct {
cqlIter *gocql.Iter
tableDef *base.Definition
colNamesToRead []string
successScope tally.Scope
failScope tally.Scope
}
// ensure that implementation (cassandraIterator) satisfies the interface
var _ orm.Iterator = (*cassandraIterator)(nil)
func newIterator(
e *base.Definition,
cols []string,
successScope tally.Scope,
failScope tally.Scope,
cqlIter *gocql.Iter,
) *cassandraIterator {
return &cassandraIterator{
cqlIter: cqlIter,
tableDef: e,
successScope: successScope,
failScope: failScope,
colNamesToRead: cols,
}
}
func (iter *cassandraIterator) Close() {
iter.cqlIter.Close()
}
func (iter *cassandraIterator) Next() ([]base.Column, error) {
result := buildResultRow(iter.tableDef, iter.colNamesToRead)
if iter.cqlIter.Scan(result...) {
row := getRowFromResult(iter.tableDef, iter.colNamesToRead, result)
return row, nil
}
// Either end-of-results or error
if errors := iter.cqlIter.Close(); errors != nil {
sendCounters(iter.failScope, iter.tableDef.Name, getIter, errors)
return nil, errors
}
sendCounters(iter.successScope, iter.tableDef.Name, getIter, nil)
return nil, nil
}
// helper function to record call latency metric
func sendLatency(
scope tally.Scope,
table, operation string,
d time.Duration,
) {
s := scope.Tagged(map[string]string{
"table": table,
"operation": operation,
})
s.Timer("execute_latency").Record(d)
}
// helper function to record cql query success/failure metrics
func sendCounters(
scope tally.Scope,
table, operation string,
err error,
) {
errMsg := "none"
if err != nil {
errMsg = getGocqlErrorTag(err)
}
s := scope.Tagged(map[string]string{
"table": table,
"operation": operation,
"error": errMsg,
})
s.Counter("execute").Inc(1)
}