func()

in encoder/json.go [378:416]


func (e *jsonEncoder) UnwrapEvent(data []byte, cfEvent *types.CommonFormatEvent) (payload []byte, err error) {
	/* cfEvent prepends the payload, decode it here */
	buf := bytes.NewBuffer(data)
	dec := json.NewDecoder(buf)
	err = dec.Decode(cfEvent)
	if err != nil {
		return
	}

	s, ok := cfEvent.Key[0].(string)
	if len(cfEvent.Key) > 1 || !ok || cfEvent.Type == "insert" || cfEvent.Type == "delete" {
		if err = e.fixFieldTypes(cfEvent); err != nil {
			return
		}
	} else if b, err := base64.StdEncoding.DecodeString(s); err == nil {
		cfEvent.Key[0] = string(b)
	} else {
		cfEvent.Key[0] = s
	}

	if e.inSchema != nil && cfEvent.Type == "schema" {
		if err = e.UpdateCodec(); err != nil {
			return
		}
	}
	/* Return everything after cfEvent as a payload */
	/* Append cached in json decoder */
	var buf1 bytes.Buffer
	_, err = buf1.ReadFrom(dec.Buffered())
	if err != nil {
		return
	}
	/* Append remainder of the original buffer not read by json decoder */
	_, err = buf1.ReadFrom(buf)
	if err != nil {
		return
	}
	return buf1.Bytes(), nil
}