in streamer/streamer.go [183:222]
func (s *Streamer) setupChangelogConsumer(cfg *config.AppConfig) (pipe.Consumer, bool) {
var err error
var consumer pipe.Consumer
if cfg.ChangelogBuffer {
var tn string
tn, err = config.Get().GetChangelogTopicName(s.row.Service, s.row.DB, s.row.Table, s.row.Input, s.row.Output, s.row.Version, s.row.SnapshottedAt)
if log.EL(s.log, err) {
return nil, false
}
s.log.Debugf("Setting up consumer for buffer topic: %v", tn)
//Consumer MUST be created before snapshotting the table.
//Creating it after may leave a gap in events stream.
consumer, err = s.inPipe.NewConsumer(tn)
if log.EL(s.log, err) {
return nil, false
}
}
var gtid string
gtid, _, err = s.ensureChangelogReaderStart()
if log.E(err) {
if consumer != nil {
log.E(consumer.CloseOnFailure())
}
return nil, false
}
if s.row.NeedSnapshot && !s.row.Params.NoSnapshot && !s.waitForGtid(s.row.Service, s.row.Cluster, s.row.DB, s.row.Input, gtid) {
if consumer != nil {
log.E(consumer.CloseOnFailure())
}
return nil, false
}
return consumer, true
}