encoder/json.go (319 lines of code) (raw):

// Copyright (c) 2017 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package encoder import ( "bytes" "encoding/base64" "encoding/json" "fmt" "strings" "time" "github.com/uber/storagetapper/log" "github.com/uber/storagetapper/state" "github.com/uber/storagetapper/types" ) func init() { registerPlugin("json", initJSONEncoder) } //jsonEncoder implements Encoder interface into common format type jsonEncoder struct { Service string DB string Table string Input string Output string Version int inSchema *types.TableSchema filter []int //Contains indexes of fields which are not in output schema outSchema *types.CommonFormatEvent } //GenTimeFunc created to be able to return deterministic timestamp in test /*TODO: Come up with better day of doing this*/ type GenTimeFunc func() int64 func genTime() int64 { return time.Now().UnixNano() } //GenTime is polymorphic function to be able to replace common format timestamp //generator var GenTime GenTimeFunc = genTime //ZeroTime in UTC var ZeroTime = time.Time{}.In(time.UTC) func initJSONEncoder(service, db, table, input string, output string, version int) (Encoder, error) { return &jsonEncoder{Service: service, DB: db, Table: table, Input: input, Output: output, Version: version}, nil } //Type returns this encoder type func (e *jsonEncoder) Type() string { return "json" } //Schema returns table schema func (e *jsonEncoder) Schema() *types.TableSchema { return e.inSchema } //EncodeSchema encodes current output schema func (e *jsonEncoder) EncodeSchema(seqno uint64) ([]byte, error) { return e.Row(types.Schema, nil, seqno, time.Time{}) } //Row encodes row into CommonFormat func (e *jsonEncoder) Row(tp int, row *[]interface{}, seqno uint64, _ time.Time) ([]byte, error) { cf := e.convertRowToCommonFormat(tp, row, e.inSchema, seqno, e.filter) return e.CommonFormatEncode(cf) } func filterCommonFormat(filter []int, cf *types.CommonFormatEvent) *types.CommonFormatEvent { if len(filter) == 0 || cf.Fields == nil || len(*cf.Fields) == 0 { return cf } c := &types.CommonFormatEvent{SeqNo: cf.SeqNo, Type: cf.Type, Timestamp: cf.Timestamp, Key: cf.Key, Fields: new([]types.CommonFormatField)} for i, j := 0, 0; i < len(*cf.Fields); i++ { if filteredField(filter, i, &j) { continue } *c.Fields = append(*c.Fields, (*cf.Fields)[i]) } return c } //CommonFormat encodes common format event into byte array func (e *jsonEncoder) CommonFormat(cf *types.CommonFormatEvent) ([]byte, error) { if cf.Type == "schema" { err := e.UpdateCodec() if err != nil { return nil, err } } cf = filterCommonFormat(e.filter, cf) return e.CommonFormatEncode(cf) } /*UpdateCodec refreshes the schema from state DB */ func (e *jsonEncoder) UpdateCodec() error { schema, err := state.GetSchema(e.Service, e.DB, e.Table, e.Input, e.Output, e.Version) if err != nil { return err } e.inSchema = schema n, err := GetOutputSchemaName(e.Service, e.DB, e.Table, e.Input, e.Output, e.Version) if err != nil { return err } s := state.GetOutputSchema(n, "json") if s != "" { c, err := e.schemaDecode([]byte(s)) if err != nil { return err } if c.Type != "schema" { return nil //return fmt.Errorf("Broken schema in state for %v,%v,%v", e.Service, e.DB, e.Table) } e.outSchema = c if e.outSchema.Fields != nil && len(e.inSchema.Columns)-len(*e.outSchema.Fields) < 0 { err = fmt.Errorf("input schema has less fields than output schema") log.E(err) return err } } e.prepareFilter() return err } func fixFieldType(f interface{}, dt string) (interface{}, error) { var err error switch v := f.(type) { case float64: switch dt { case "bigint", "enum", "set": f = int64(v) case "int", "integer", "tinyint", "smallint", "mediumint", "year": f = int32(v) case "float": f = float32(v) } case string: switch dt { case "blob", "tinyblob", "mediumblob", "longblob", "binary", "varbinary": f, err = base64.StdEncoding.DecodeString(v) if err != nil { return nil, err } case "timestamp", "datetime": t := ZeroTime //MySQL binlog reader library return string intread of time.Time for //"zero" time. if !strings.HasPrefix(v, "0000-00-00 00:00:00") { //t.UnmashalJSON can't be used, because it uses RFC3339 to parse while //MarshalJSON uses RFC3339Nano format. t, err = time.Parse(time.RFC3339Nano, v) if err != nil { return nil, err } if dt == "timestamp" && !t.Equal(time.Time{}) { //If the local time is UTC while marshalling, then timezone info is //not included in result, in this case when parsing back, timezone is //not matched to local timezone. We forcibly set zone to Local if //local zone is UTC and parsed time in UTC. name, _ := time.Now().Zone() //TODO: cache this if t.Location().String() == "UTC" && name == "UTC" { t = t.In(time.Local) } } else { t = t.In(time.UTC) } } f = t } } return f, nil } func (e *jsonEncoder) fixFieldTypes(res *types.CommonFormatEvent) (err error) { k := 0 //Restore field types according to schema if e.inSchema != nil && res.Type != "schema" { var j int for i := 0; i < len(e.inSchema.Columns); i++ { if filteredField(e.filter, i, &j) { continue } if res.Fields != nil && i-j < len(*res.Fields) { f := &(*res.Fields)[i-j] f.Value, err = fixFieldType(f.Value, e.inSchema.Columns[i].DataType) if err != nil { return err } } if e.inSchema.Columns[i].Key == "PRI" && k < len(res.Key) { f := &res.Key[k] *f, err = fixFieldType(*f, e.inSchema.Columns[i].DataType) if err != nil { return err } k++ } } } return nil } func (e *jsonEncoder) jsonDecode(b []byte) (*types.CommonFormatEvent, error) { //FIXME: to properly handle integer need to use decoder and json.UseNumber res := &types.CommonFormatEvent{} err := json.Unmarshal(b, res) if err != nil { return nil, err } return res, e.fixFieldTypes(res) } func (e *jsonEncoder) schemaDecode(b []byte) (*types.CommonFormatEvent, error) { return e.jsonDecode(b) } //CommonFormatEncode encodes CommonFormatEvent into byte array func (e *jsonEncoder) CommonFormatEncode(c *types.CommonFormatEvent) ([]byte, error) { return json.Marshal(c) } func fillCommonFormatKey(e *types.CommonFormatEvent, row *[]interface{}, s *types.TableSchema) { e.Key = make([]interface{}, 0) for i := 0; i < len(s.Columns); i++ { if s.Columns[i].Key == "PRI" { if row == nil { e.Key = append(e.Key, s.Columns[i].Name) } else { e.Key = append(e.Key, (*row)[i]) //FIXME: Fix datatypes same as in fields } } } } func fillCommonFormatFields(c *types.CommonFormatEvent, row *[]interface{}, schema *types.TableSchema, filter []int) { f := make([]types.CommonFormatField, 0, len(schema.Columns)) for i, j := 0, 0; i < len(schema.Columns); i++ { if filteredField(filter, i, &j) { continue } var v interface{} if row == nil { v = schema.Columns[i].Type } else { s := schema.Columns[i].DataType if schema.Columns[i].Type == types.MySQLBoolean { b, ok := (*row)[i].(int8) if ok { if b != 0 { v = true } else { v = false } } else { v = (*row)[i] } } else if s == "text" || s == "tinytext" || s == "mediumtext" || s == "longtext" || s == "json" { b, ok := (*row)[i].([]byte) if ok { v = string(b) } else { v = (*row)[i] } } else if s == "binary" || s == "varbinary" { b, ok := (*row)[i].(string) if ok { v = []byte(b) } else { v = (*row)[i] } } else { v = (*row)[i] } } f = append(f, types.CommonFormatField{Name: schema.Columns[i].Name, Value: v}) } c.Fields = &f } func (e *jsonEncoder) convertRowToCommonFormat(tp int, row *[]interface{}, schema *types.TableSchema, seqNo uint64, filter []int) *types.CommonFormatEvent { var c types.CommonFormatEvent //log.Debugf("cf %+v %+v %+v %+v", tp, row, s, seqNo) c.SeqNo = seqNo c.Timestamp = GenTime() switch tp { case types.Insert: c.Type = "insert" fillCommonFormatKey(&c, row, schema) fillCommonFormatFields(&c, row, schema, filter) case types.Delete: c.Type = "delete" fillCommonFormatKey(&c, row, schema) case types.Schema: c.Type = "schema" fillCommonFormatKey(&c, nil, schema) fillCommonFormatFields(&c, nil, schema, filter) default: panic("unknown event type") } return &c } func (e *jsonEncoder) prepareFilter() { if e.outSchema == nil { return } nfiltered := len(e.inSchema.Columns) if e.outSchema.Fields != nil { nfiltered = nfiltered - len(*e.outSchema.Fields) } if nfiltered == 0 { return } f := e.outSchema.Fields e.filter = make([]int, 0) var j int for i := 0; i < len(e.inSchema.Columns); i++ { //Primary key cannot be filtered if f == nil || len(*f) == 0 || (i-j) >= len(*f) || e.inSchema.Columns[i].Name != (*f)[i-j].Name { if e.inSchema.Columns[i].Key != "PRI" { log.Debugf("Field %v will be filtered", e.inSchema.Columns[i].Name) e.filter = append(e.filter, i) } j++ } } log.Debugf("len=%v, filtered fields (%v)", len(e.filter), e.filter) } // UnwrapEvent splits the event header and payload // cfEvent is populated with the 'header' information aka the first decoding. // Data after the header returned in the payload parameter func (e *jsonEncoder) UnwrapEvent(data []byte, cfEvent *types.CommonFormatEvent) (payload []byte, err error) { /* cfEvent prepends the payload, decode it here */ buf := bytes.NewBuffer(data) dec := json.NewDecoder(buf) err = dec.Decode(cfEvent) if err != nil { return } s, ok := cfEvent.Key[0].(string) if len(cfEvent.Key) > 1 || !ok || cfEvent.Type == "insert" || cfEvent.Type == "delete" { if err = e.fixFieldTypes(cfEvent); err != nil { return } } else if b, err := base64.StdEncoding.DecodeString(s); err == nil { cfEvent.Key[0] = string(b) } else { cfEvent.Key[0] = s } if e.inSchema != nil && cfEvent.Type == "schema" { if err = e.UpdateCodec(); err != nil { return } } /* Return everything after cfEvent as a payload */ /* Append cached in json decoder */ var buf1 bytes.Buffer _, err = buf1.ReadFrom(dec.Buffered()) if err != nil { return } /* Append remainder of the original buffer not read by json decoder */ _, err = buf1.ReadFrom(buf) if err != nil { return } return buf1.Bytes(), nil } //DecodeEvent decodes JSON encoded array into CommonFormatEvent struct func (e *jsonEncoder) DecodeEvent(b []byte) (*types.CommonFormatEvent, error) { return e.jsonDecode(b) }