encoder/encoder.go (105 lines of code) (raw):

// Copyright (c) 2017 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package encoder import ( "bytes" "fmt" "strings" "time" "github.com/uber/storagetapper/types" ) //encoderConstructor initializes encoder plugin type encoderConstructor func(service, db, table, input string, output string, version int) (Encoder, error) //plugins insert their constructors into this map var encoders map[string]encoderConstructor //Internal is encoder for intermediate buffer messages and message wrappers. Initialized in z.go var Internal Encoder //registerPlugin should be called from plugin's init func registerPlugin(name string, init encoderConstructor) { if encoders == nil { encoders = make(map[string]encoderConstructor) } encoders[name] = init } //Encoder is unified interface to encode data from transit formats(row, common) type Encoder interface { Row(tp int, row *[]interface{}, seqNo uint64, ts time.Time) ([]byte, error) CommonFormat(cf *types.CommonFormatEvent) ([]byte, error) EncodeSchema(seqNo uint64) ([]byte, error) UpdateCodec() error Type() string Schema() *types.TableSchema UnwrapEvent(data []byte, cfEvent *types.CommonFormatEvent) (payload []byte, err error) DecodeEvent(b []byte) (*types.CommonFormatEvent, error) } //Encoders return the list of encoders names func Encoders() []string { r := make([]string, len(encoders)) i := 0 for k := range encoders { r[i] = k i++ } return r } //InitEncoder constructs encoder without updating schema func InitEncoder(encType, svc, sdb, tbl, input string, output string, version int) (Encoder, error) { init := encoders[strings.ToLower(encType)] if init == nil { return nil, fmt.Errorf("unsupported encoder: %s", strings.ToLower(encType)) } enc, err := init(svc, sdb, tbl, input, output, version) if err != nil { return nil, err } return enc, nil } //Create is a factory which create encoder of given type for given service, db, //table, input, output, version func Create(encType, svc, sdb, tbl, input string, output string, version int) (Encoder, error) { enc, err := InitEncoder(encType, svc, sdb, tbl, input, output, version) if err != nil { return nil, err } return enc, enc.UpdateCodec() } //GetRowKey concatenates row primary key fields into string //TODO: Should we encode into byte array instead? func GetRowKey(s *types.TableSchema, row *[]interface{}) string { var key string for i := 0; i < len(s.Columns); i++ { if s.Columns[i].Key == "PRI" { if row == nil { k := fmt.Sprintf("%v", s.Columns[i].Name) key += fmt.Sprintf("%v%v", len(k), k) } else { k := fmt.Sprintf("%v", (*row)[i]) key += fmt.Sprintf("%v%v", len(k), k) } } } return key } //GetCommonFormatKey concatenates common format key into string func GetCommonFormatKey(cf *types.CommonFormatEvent) string { var key string for _, v := range cf.Key { s := fmt.Sprintf("%v", v) key += fmt.Sprintf("%v%v", len(s), s) } return key } func filteredField(filter []int, i int, j *int) bool { if *j < len(filter) && filter[*j] == i { (*j)++ return true } return false } //WrapEvent prepend provided payload with CommonFormat like event func WrapEvent(outputFormat string, key string, bd []byte, seqno uint64) ([]byte, error) { akey := make([]interface{}, 1) akey[0] = []byte(key) cfw := types.CommonFormatEvent{ Type: outputFormat, Key: akey, SeqNo: seqno, Timestamp: time.Now().UnixNano(), Fields: nil, } cfb, err := Internal.CommonFormat(&cfw) if err != nil { return nil, err } buf := bytes.NewBuffer(cfb) _, err = buf.Write(bd) if err != nil { return nil, err } return buf.Bytes(), nil }