in streamer/buffer.go [49:105]
func (s *Streamer) encodeCommonFormat(outProducer pipe.Producer, data []byte) (key string, outMsg []byte, err error) {
cfEvent := &types.CommonFormatEvent{}
payload, err := s.envEncoder.UnwrapEvent(data, cfEvent)
if log.EL(s.log, err) {
log.Errorf("broken event: %v %v", data, len(data))
return
}
s.metrics.TimeInBuffer.Record(time.Since(time.Unix(0, cfEvent.Timestamp)))
// log.Debugf("commont format received %v %v", cfEvent, cfEvent.Fields)
if cfEvent.Type == "insert" || cfEvent.Type == "delete" || cfEvent.Type == "schema" {
outMsg, err = s.outEncoder.CommonFormat(cfEvent)
if log.EL(s.log, err) {
return
}
key = encoder.GetCommonFormatKey(cfEvent)
if cfEvent.Type == "schema" && outMsg != nil {
key = outProducer.PartitionKey("log", key)
err = outProducer.PushSchema(key, outMsg)
log.EL(s.log, err)
outMsg = nil
return
}
} else if cfEvent.Type == s.row.OutputFormat {
outMsg = payload
key = cfEvent.Key[0].(string)
if key == "" {
err = s.outEncoder.UpdateCodec()
if log.EL(s.log, err) {
return
}
}
// log.Debugf("Data in final format already. Forwarding. Key=%v, SeqNo=%v", key, cfEvent.SeqNo)
} else if cfEvent.Type == s.envEncoder.Type() {
var ev *types.CommonFormatEvent
ev, err = s.envEncoder.DecodeEvent(payload)
if log.EL(s.log, err) {
return
}
outMsg, err = s.outEncoder.CommonFormat(ev)
if log.EL(s.log, err) {
return
}
key = encoder.GetCommonFormatKey(ev)
} else {
err = fmt.Errorf("unsupported conversion from: %v to %v", cfEvent.Type, s.row.OutputFormat)
}
return
}