in encoder/json.go [378:416]
func (e *jsonEncoder) UnwrapEvent(data []byte, cfEvent *types.CommonFormatEvent) (payload []byte, err error) {
/* cfEvent prepends the payload, decode it here */
buf := bytes.NewBuffer(data)
dec := json.NewDecoder(buf)
err = dec.Decode(cfEvent)
if err != nil {
return
}
s, ok := cfEvent.Key[0].(string)
if len(cfEvent.Key) > 1 || !ok || cfEvent.Type == "insert" || cfEvent.Type == "delete" {
if err = e.fixFieldTypes(cfEvent); err != nil {
return
}
} else if b, err := base64.StdEncoding.DecodeString(s); err == nil {
cfEvent.Key[0] = string(b)
} else {
cfEvent.Key[0] = s
}
if e.inSchema != nil && cfEvent.Type == "schema" {
if err = e.UpdateCodec(); err != nil {
return
}
}
/* Return everything after cfEvent as a payload */
/* Append cached in json decoder */
var buf1 bytes.Buffer
_, err = buf1.ReadFrom(dec.Buffered())
if err != nil {
return
}
/* Append remainder of the original buffer not read by json decoder */
_, err = buf1.ReadFrom(buf)
if err != nil {
return
}
return buf1.Bytes(), nil
}