in encoder/avro.go [440:485]
func (e *avroEncoder) DecodeEvent(b []byte) (*types.CommonFormatEvent, error) {
var c types.CommonFormatEvent
rec, err := e.codec.Decode(bytes.NewReader(b))
if err != nil {
return nil, err
}
r := rec.(*goavro.Record)
c.Type = "insert"
c.Key = make([]interface{}, 0)
del, err := r.Get("is_deleted")
if err != nil {
return nil, err
}
if v, ok := del.(bool); ok && v {
c.Type = "delete"
// row key is needed by delete only
rowKey, err := r.Get("row_key")
if err != nil {
return nil, err
}
if v, ok := rowKey.([]uint8); ok {
c.Key = append(c.Key, string(v))
} else {
return nil, fmt.Errorf("type of row_key field should be []uint8")
}
} else if !ok {
return nil, fmt.Errorf("type of is_deleted field should be bool")
}
seqno, err := r.Get("ref_key")
if err != nil {
return nil, err
}
if v, ok := seqno.(int64); ok {
c.SeqNo = uint64(v)
} else {
return nil, fmt.Errorf("type of ref_key field should be int64")
}
c.Timestamp = 0
return &c, e.decodeEventFields(&c, r)
}