encoder/msgpack.go (109 lines of code) (raw):

// Copyright (c) 2017 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package encoder import ( "bytes" "time" "github.com/tinylib/msgp/msgp" "github.com/uber/storagetapper/types" ) func init() { registerPlugin("msgpack", initMsgPackEncoder) } // msgPackEncoder implements Encoder interface into message pack format. // It inherits the methods from jsonEncoder. type msgPackEncoder struct { jsonEncoder } func initMsgPackEncoder(service, db, table, input string, output string, version int) (Encoder, error) { return &msgPackEncoder{jsonEncoder{Service: service, DB: db, Table: table, Input: input, Output: output, Version: version}}, nil } //Row encodes row into CommonFormat func (e *msgPackEncoder) Row(tp int, row *[]interface{}, seqno uint64, _ time.Time) ([]byte, error) { cf := e.convertRowToCommonFormat(tp, row, e.inSchema, seqno, e.filter) return cf.MarshalMsg(nil) } //EncodeSchema encodes current output schema func (e *msgPackEncoder) EncodeSchema(seqno uint64) ([]byte, error) { return e.Row(types.Schema, nil, seqno, time.Time{}) } //CommonFormat encodes common format event into byte array func (e *msgPackEncoder) CommonFormat(cf *types.CommonFormatEvent) ([]byte, error) { if cf.Type == "schema" { err := e.UpdateCodec() if err != nil { return nil, err } } cf = filterCommonFormat(e.filter, cf) return cf.MarshalMsg(nil) } //Type returns this encoder type func (e *msgPackEncoder) Type() string { return "msgpack" } func (e *msgPackEncoder) fixFieldType(dtype string, f interface{}) interface{} { switch v := f.(type) { case int64: switch dtype { case "int", "integer", "tinyint", "smallint", "mediumint", "year": return int32(v) } case time.Time: switch dtype { case "datetime": return v.In(time.UTC) case "timestamp": if v.Equal(time.Time{}) { return v.In(time.UTC) } return v.In(time.Local) } //There is one corner case when time can be encoded as string it's 0000-00-00 00:00:00 case string: switch dtype { case "datetime": return ZeroTime case "timestamp": return ZeroTime } } return f } func (e *msgPackEncoder) fixFieldTypes(cf *types.CommonFormatEvent) (err error) { k := 0 //Restore field types according to schema //MsgPack doesn't preserve int type size, so fix it if e.inSchema != nil && cf.Type != "schema" { for i, j := 0, 0; i < len(e.inSchema.Columns); i++ { if filteredField(e.filter, i, &j) { continue } if cf.Fields != nil && i-j < len(*cf.Fields) { f := &(*cf.Fields)[i-j] f.Value = e.fixFieldType(e.inSchema.Columns[i].DataType, f.Value) } if e.inSchema.Columns[i].Key == "PRI" && k < len(cf.Key) { cf.Key[k] = e.fixFieldType(e.inSchema.Columns[i].DataType, cf.Key[k]) k++ } } } return err } func (e *msgPackEncoder) msgPackDecode(b []byte) (*types.CommonFormatEvent, []byte, error) { cf := &types.CommonFormatEvent{} rem, err := cf.UnmarshalMsg(b) if err != nil { return nil, nil, err } return cf, rem, e.fixFieldTypes(cf) } // UnwrapEvent splits the event header and payload // cfEvent is populated with the 'header' information aka the first decoding. // Data after the header returned in the payload parameter func (e *msgPackEncoder) UnwrapEvent(data []byte, cfEvent *types.CommonFormatEvent) (payload []byte, err error) { err = cfEvent.DecodeMsg(msgp.NewReader(bytes.NewBuffer(data))) if err != nil { return } b, ok := cfEvent.Key[0].([]uint8) if len(cfEvent.Key) > 1 || !ok || cfEvent.Type == "insert" || cfEvent.Type == "delete" { if err = e.fixFieldTypes(cfEvent); err != nil { return } } else { cfEvent.Key[0] = string(b) } return msgp.Skip(data) } //DecodeEvent decodes MsgPack encoded array into CommonFormatEvent struct func (e *msgPackEncoder) DecodeEvent(b []byte) (*types.CommonFormatEvent, error) { cf, _, err := e.msgPackDecode(b) return cf, err }