func()

in streamer/buffer.go [49:105]


func (s *Streamer) encodeCommonFormat(outProducer pipe.Producer, data []byte) (key string, outMsg []byte, err error) {
	cfEvent := &types.CommonFormatEvent{}
	payload, err := s.envEncoder.UnwrapEvent(data, cfEvent)
	if log.EL(s.log, err) {
		log.Errorf("broken event: %v %v", data, len(data))
		return
	}

	s.metrics.TimeInBuffer.Record(time.Since(time.Unix(0, cfEvent.Timestamp)))

	//  log.Debugf("commont format received %v %v", cfEvent, cfEvent.Fields)

	if cfEvent.Type == "insert" || cfEvent.Type == "delete" || cfEvent.Type == "schema" {
		outMsg, err = s.outEncoder.CommonFormat(cfEvent)
		if log.EL(s.log, err) {
			return
		}

		key = encoder.GetCommonFormatKey(cfEvent)

		if cfEvent.Type == "schema" && outMsg != nil {
			key = outProducer.PartitionKey("log", key)

			err = outProducer.PushSchema(key, outMsg)
			log.EL(s.log, err)

			outMsg = nil
			return
		}
	} else if cfEvent.Type == s.row.OutputFormat {
		outMsg = payload
		key = cfEvent.Key[0].(string)
		if key == "" {
			err = s.outEncoder.UpdateCodec()
			if log.EL(s.log, err) {
				return
			}
		}
		//    log.Debugf("Data in final format already. Forwarding. Key=%v, SeqNo=%v", key, cfEvent.SeqNo)
	} else if cfEvent.Type == s.envEncoder.Type() {
		var ev *types.CommonFormatEvent
		ev, err = s.envEncoder.DecodeEvent(payload)
		if log.EL(s.log, err) {
			return
		}
		outMsg, err = s.outEncoder.CommonFormat(ev)
		if log.EL(s.log, err) {
			return
		}

		key = encoder.GetCommonFormatKey(ev)
	} else {
		err = fmt.Errorf("unsupported conversion from: %v to %v", cfEvent.Type, s.row.OutputFormat)
	}

	return
}