subscriber/common/sink/sink.go (80 lines of code) (raw):

// Copyright (c) 2017-2018 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package sink import ( "github.com/uber/aresdb/client" memCom "github.com/uber/aresdb/memstore/common" "github.com/uber/aresdb/subscriber/common/rules" "github.com/uber/aresdb/utils" "math" "strings" "unsafe" ) // Sink is abstraction for interactions with downstream storage layer type Sink interface { // Cluster returns the DB cluster name Cluster() string // Save will save the rows into underlying database Save(destination Destination, rows []client.Row) error // Shutdown will close the connections to the database Shutdown() } // Destination contains the table and columns that each job is storing data into // also records the behavior when encountering key errors type Destination struct { // Table is table name Table string // ColumnNames are the list of column names after sorted ColumnNames []string // PrimaryKeys maps primary key columnName to its columnID after sorted PrimaryKeys map[string]int // PrimaryKeysInSchema maps primary key columnName to its columnID defined in schema PrimaryKeysInSchema map[string]int // AresUpdateModes defines update modes AresUpdateModes []memCom.ColumnUpdateMode // NumShards is the number of shards in the aresDB cluster NumShards uint32 } func Shard(rows []client.Row, destination Destination, jobConfig *rules.JobConfig) (map[uint32][]client.Row, int) { rowsIgnored := 0 if destination.NumShards == 0 || destination.NumShards == 1 { // in this case, there is no sharding in this aresDB cluster return nil, rowsIgnored } shards := make(map[uint32][]client.Row) for i := uint32(0); i < destination.NumShards; i++ { shards[i] = make([]client.Row, 0, len(rows)) } for _, row := range rows { // convert primaryKey to byte array pk, err := getPrimaryKeyBytes(row, destination, jobConfig, jobConfig.GetPrimaryKeyBytes()) if err != nil { rowsIgnored++ continue } // calculate shard shardID := shardFn(pk, destination.NumShards) shards[shardID] = append(shards[shardID], row) } return shards, rowsIgnored } func shardFn(key []byte, numShards uint32) uint32 { return utils.Murmur3Sum32(unsafe.Pointer(&key[0]), len(key), 0) / (math.MaxUint32 / numShards) } func getPrimaryKeyBytes(row client.Row, destination Destination, jobConfig *rules.JobConfig, keyLength int) ([]byte, error) { primaryKeyValues := make([]memCom.DataValue, len(destination.PrimaryKeys)) var err error // create empty key with keyLength capacity key := make([]byte, 0, keyLength) var strBytes []byte i := 0 for columnName, columnID := range destination.PrimaryKeys { columnIDInSchema := destination.PrimaryKeysInSchema[columnName] if jobConfig.AresTableConfig.Table.Columns[columnIDInSchema].IsEnumColumn() { // convert the string to bytes if primaryKey value is string str := row[columnID].(string) if strBytes == nil { strBytes = make([]byte, 0, len(str)) } if !jobConfig.AresTableConfig.Table.Columns[columnIDInSchema].CaseInsensitive { str = strings.ToLower(row[columnID].(string)) } strBytes = append(strBytes, []byte(str)...) } else { primaryKeyValues[i], err = memCom.GetDataValue(row[columnID], columnIDInSchema, jobConfig.AresTableConfig.Table.Columns[columnIDInSchema].Type) if err != nil { return key, utils.StackError(err, "Failed to read primary key at row %d, col %d", row, columnID) } i++ } } if key, err = memCom.AppendPrimaryKeyBytes(key, memCom.NewSliceDataValueIterator(primaryKeyValues)); err != nil { return key, err } if strBytes != nil { key = append(key, strBytes...) } return key, err }