encoder/avro.go (372 lines of code) (raw):

// Copyright (c) 2017 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package encoder import ( "bytes" "fmt" "strings" "time" "github.com/linkedin/goavro" "github.com/uber/storagetapper/log" "github.com/uber/storagetapper/state" "github.com/uber/storagetapper/types" ) const numMetadataFields = 3 func init() { registerPlugin("avro", initAvroEncoder) } //avroEncoder implements Encoder interface for Avro format type avroEncoder struct { Service string DB string Table string Input string Output string Version int codec goavro.Codec setter *goavro.RecordSetter inSchema *types.TableSchema filter []int outSchema *types.AvroSchema } func initAvroEncoder(service, db, table, input string, output string, version int) (Encoder, error) { return &avroEncoder{Service: service, DB: db, Table: table, Input: input, Output: output, Version: version}, nil } //Type returns type of the encoder interface (faster then type assertion?) func (e *avroEncoder) Type() string { return "avro" } //Schema return structured schema of the table func (e *avroEncoder) Schema() *types.TableSchema { return e.inSchema } //EncodeSchema - encodes schema //Avro format doesn't support schema in the stream func (e *avroEncoder) EncodeSchema(seqno uint64) ([]byte, error) { return nil, nil } //Row convert raw binary log event into Avro record func (e *avroEncoder) Row(tp int, row *[]interface{}, seqno uint64, _ time.Time) ([]byte, error) { r, err := goavro.NewRecord(*e.setter) if err != nil { return nil, err } err = convertRowToAvroFormat(tp, row, e.inSchema, seqno, r, e.filter) if err != nil { return nil, err } return encodeAvroRecord(e.codec, r) } //CommonFormat encodes CommonFormat event into Avro record func (e *avroEncoder) CommonFormat(cf *types.CommonFormatEvent) ([]byte, error) { if cf.Type == "schema" { err := e.UpdateCodec() return nil, err } //TODO: Explore using reader/writer interface r, err := goavro.NewRecord(*e.setter) if err != nil { return nil, err } err = convertCommonFormatToAvroRecord(*e.setter, cf, r, e.filter) if err != nil { return nil, err } return encodeAvroRecord(e.codec, r) } // convertCommonFormatToAvroRecord creates a new Avro record from the common format event, adding the necessary // metadata of row_key, ref_key and is_deleted. func convertCommonFormatToAvroRecord(rs goavro.RecordSetter, cfEvent *types.CommonFormatEvent, rec *goavro.Record, filter []int) error { err := rec.Set("row_key", []byte(GetCommonFormatKey(cfEvent))) //TODO: Revisit row_key from primary_key if err != nil { return err } err = rec.Set("ref_key", int64(cfEvent.SeqNo)) if err != nil { return err } err = rec.Set("is_deleted", strings.EqualFold(cfEvent.Type, "delete")) if err != nil { return err } if cfEvent.Fields == nil { return nil } for i, j := 0, 0; i < len(*cfEvent.Fields); i++ { if filteredField(filter, i, &j) { continue } field := (*cfEvent.Fields)[i] /* If the field is integer convert it from JSON's float number */ switch r := field.Value.(type) { case float64: s, err := rec.GetFieldSchema(field.Name) if err != nil { return err } for _, t := range s.(map[string]interface{})["type"].([]interface{}) { switch t.(string) { case "int": field.Value = int32(r) case "long": field.Value = int64(r) } } case time.Time: if !r.Equal(time.Time{}) { field.Value = r.UnixNano() / 1000000 } else { field.Value = nil } } err = rec.Set(field.Name, field.Value) if err != nil { return err } } return nil } //UpdateCodec updates encoder schema func (e *avroEncoder) UpdateCodec() error { var err error log.Debugf("Schema codec (%v) updating", e.Type()) //Schema from state is used to encode from row format, whether in //binlog reader or when pipe type is local, so schema is always //corresponds to the message schema e.inSchema, err = state.GetSchema(e.Service, e.DB, e.Table, e.Input, e.Output, e.Version) if log.E(err) { return err } n, err := GetOutputSchemaName(e.Service, e.DB, e.Table, e.Input, e.Output, e.Version) if log.E(err) { return err } e.outSchema, err = GetLatestSchema("production", n, "avro") if log.E(err) { return err } if len(e.inSchema.Columns)-(len(e.outSchema.Fields)-numMetadataFields) < 0 { err = fmt.Errorf("input schema has less fields than output schema") log.E(err) return err } e.codec, e.setter, err = SchemaCodecHelper(e.outSchema) if log.E(err) { return err } e.filter = prepareFilter(e.inSchema, e.outSchema, numMetadataFields) log.Debugf("Schema codec (%v) updated", e.Type()) return err } //encodeAvroRecord serializes(encodes) Avro record into byte array func encodeAvroRecord(codec goavro.Codec, r *goavro.Record) ([]byte, error) { w := new(bytes.Buffer) err := codec.Encode(w, r) if err != nil { return nil, err } return w.Bytes(), nil } //fillAvroKey fills Avro records row_key from primary key of the row func fillAvroKey(e *goavro.Record, row *[]interface{}, s *types.TableSchema) error { var rowKey string // rowKey := make([]interface{}, 0) for i := 0; i < len(s.Columns); i++ { if s.Columns[i].Key == "PRI" { /* if row == nil { //rowKey = append(rowKey, s.Columns[i].Name) k := fmt.Sprintf("%v", s.Columns[i].Name) rowKey += fmt.Sprintf("%v%v", len(k), k) } else { */ //rowKey = append(rowKey, (*row)[i]) k := fmt.Sprintf("%v", (*row)[i]) rowKey += fmt.Sprintf("%v%v", len(k), k) //} } } return e.Set("row_key", []byte(rowKey)) } func convertTime(b string, dtype string) (interface{}, error) { var v interface{} //mysql binlog reader library returns string for binary type if dtype == "timestamp" || dtype == "datetime" { if strings.HasPrefix(b, "0000-00-00 00:00:00") { return nil, nil } t, err := time.Parse(time.RFC3339Nano, b) if err != nil { return nil, err } v = t.UnixNano() / 1000000 } else if dtype == "binary" || dtype == "varbinary" { v = []byte(b) } else { v = b } return v, nil } func convertText(b []byte, d string) interface{} { //mysql binlog reader library returns []byte for text type if d == "text" || d == "tinytext" || d == "mediumtext" || d == "longtext" || d == "json" { return string(b) } return b } func fixAvroFieldType(i interface{}, dtype string, ftype string) (interface{}, error) { var err error var v interface{} /*if row == nil { v = s.Columns[i].Type } else { */ switch b := i.(type) { case int8: if ftype == types.MySQLBoolean { v = false if b != 0 { v = true } } else { v = int32(b) } case uint8: v = int32(b) case int16: v = int32(b) case uint16: v = int32(b) case time.Time: v = b.UnixNano() / 1000000 //milliseconds case string: v, err = convertTime(b, dtype) if err != nil { return nil, err } case []byte: v = convertText(b, dtype) default: v = b } return v, nil } //fillAvroFields fill fields of the Avro record from the row //TODO: Remove ability to encode schema, so as receiver should have schema to decode //the record, so no point in pushing schema into stream func fillAvroFields(r *goavro.Record, row *[]interface{}, s *types.TableSchema, filter []int) error { for i, j := 0, 0; i < len(s.Columns); i++ { //Skip fields which are not present in output schema if filteredField(filter, i, &j) { continue } v, err := fixAvroFieldType((*row)[i], s.Columns[i].DataType, s.Columns[i].Type) if err != nil { return err } //} //TODO: Consider passing primary key as fields in delete event, instead //of separate row_key field //if keyOnly && s.Columns[i].Key != "PRI" { // continue //} err = r.Set(s.Columns[i].Name, v) if err != nil { return err } } return nil } //convertRowToAvroFormat uses fillAvroKey and fillAvroFields to convert //the complete Avro record from row func convertRowToAvroFormat(tp int, row *[]interface{}, s *types.TableSchema, seqNo uint64, r *goavro.Record, filter []int) error { err := r.Set("ref_key", int64(seqNo)) if err != nil { return err } switch tp { case types.Insert: err = fillAvroKey(r, row, s) if err != nil { return err } err = fillAvroFields(r, row, s, filter) if err != nil { return err } err = r.Set("is_deleted", false) if err != nil { return err } case types.Delete: err = fillAvroKey(r, row, s) if err != nil { return err } // fillAvroFields(r, row, s, filter, true) err = r.Set("is_deleted", true) if err != nil { return err } default: return fmt.Errorf("unknown event type: %v", tp) } return nil } func prepareFilter(in *types.TableSchema, out *types.AvroSchema, numMetaFields int) []int { if out == nil { return nil } nfiltered := len(in.Columns) if out.Fields != nil { nfiltered = nfiltered - (len(out.Fields) - numMetaFields) } if nfiltered == 0 { return nil } f := out.Fields filter := make([]int, 0) var j int for i := 0; i < len(in.Columns); i++ { //Primary key cannot be filtered if (i-j) >= len(f) || in.Columns[i].Name != f[i-j].Name { if in.Columns[i].Key != "PRI" { log.Debugf("Field %v will be filtered", in.Columns[i].Name) filter = append(filter, i) } j++ } } log.Debugf("len=%v, filtered fields (%v)", len(filter), filter) return filter } func (e *avroEncoder) UnwrapEvent(data []byte, cfEvent *types.CommonFormatEvent) (payload []byte, err error) { return nil, fmt.Errorf("avro encoder doesn't support wrapping") } func (e *avroEncoder) decodeEventFields(c *types.CommonFormatEvent, r *goavro.Record) error { hasNonNil := false c.Fields = new([]types.CommonFormatField) for i, j := 0, 0; i < len(e.inSchema.Columns); i++ { if filteredField(e.filter, i, &j) { continue } n := e.inSchema.Columns[i].Name v, err := r.Get(n) if err != nil && !strings.Contains(err.Error(), "no such field") { return err } if v != nil { hasNonNil = true dt := e.inSchema.Columns[i].DataType if dt == "timestamp" || dt == "datetime" { ms, ok := v.(int64) if !ok { return fmt.Errorf("timestamp and datetime formats expected to be int64") } t := time.Unix(ms/1000, (ms%1000)*1000000) if dt == "datetime" { t = t.In(time.UTC) } v = t } } *c.Fields = append(*c.Fields, types.CommonFormatField{Name: n, Value: v}) if e.inSchema.Columns[i].Key == "PRI" && v != nil { c.Key = append(c.Key, v) } } if !hasNonNil { c.Fields = nil } return nil } func (e *avroEncoder) DecodeEvent(b []byte) (*types.CommonFormatEvent, error) { var c types.CommonFormatEvent rec, err := e.codec.Decode(bytes.NewReader(b)) if err != nil { return nil, err } r := rec.(*goavro.Record) c.Type = "insert" c.Key = make([]interface{}, 0) del, err := r.Get("is_deleted") if err != nil { return nil, err } if v, ok := del.(bool); ok && v { c.Type = "delete" // row key is needed by delete only rowKey, err := r.Get("row_key") if err != nil { return nil, err } if v, ok := rowKey.([]uint8); ok { c.Key = append(c.Key, string(v)) } else { return nil, fmt.Errorf("type of row_key field should be []uint8") } } else if !ok { return nil, fmt.Errorf("type of is_deleted field should be bool") } seqno, err := r.Get("ref_key") if err != nil { return nil, err } if v, ok := seqno.(int64); ok { c.SeqNo = uint64(v) } else { return nil, fmt.Errorf("type of ref_key field should be int64") } c.Timestamp = 0 return &c, e.decodeEventFields(&c, r) }