func()

in changelog/mysql.go [462:506]


func (b *mysqlReader) produceRow(tp int, t *table, ts time.Time, row *[]interface{}) error {
	var err error
	buffered := config.Get().ChangelogBuffer
	seqno := b.nextSeqNo()
	if seqno == 0 {
		return fmt.Errorf("failed to generate next seqno. Current seqno: %+v", b.seqNo)
	}
	key := encoder.GetRowKey(t.encoder.Schema(), row)
	if buffered && b.bufPipe.Type() == "local" {
		err = t.producer.PushBatch(key, &types.RowMessage{Type: tp, Key: key, Data: row, SeqNo: seqno, Timestamp: ts})
	} else {
		var bd []byte
		bd, err = t.encoder.Row(tp, row, seqno, ts)
		if err != nil {
			var masterGTID string
			if strings.Contains(err.Error(), "column count mismatch") {
				var err1 error
				masterGTID, err1 = db.GetCurrentGTID(b.masterCI)
				log.E(err1)
			}
			b.log.WithFields(log.Fields{"state gtid": util.SortedGTIDString(b.gtidSet), "event gtid": gtidToString(&b.curGTID), "master gtid": masterGTID, "table": t.id}).Errorf(err.Error())
			return err
		}
		if !buffered {
			key = t.producer.PartitionKey("log", key)
		} else if t.encoder.Type() != encoder.Internal.Type() {
			bd, err = encoder.WrapEvent(t.outputFormat, key, bd, seqno)
			if log.EL(b.log, err) {
				return err
			}
		}
		err = t.producer.PushBatch(key, bd)
		b.metrics.BytesWritten.Inc(int64(len(bd)))
	}
	//log.Debugf("Pushed to buffer. seqno=%v, table=%v", seqno, t.id)
	if shutdown.Initiated() {
		return nil
	}
	if err != nil {
		b.log.Errorf("Type: %v, Error: %v", tp, err.Error())
		return err
	}
	b.metrics.ChangelogRowEventsWritten.Inc(1)
	return nil
}