func()

in streamer/streamer.go [183:222]


func (s *Streamer) setupChangelogConsumer(cfg *config.AppConfig) (pipe.Consumer, bool) {
	var err error
	var consumer pipe.Consumer

	if cfg.ChangelogBuffer {
		var tn string
		tn, err = config.Get().GetChangelogTopicName(s.row.Service, s.row.DB, s.row.Table, s.row.Input, s.row.Output, s.row.Version, s.row.SnapshottedAt)
		if log.EL(s.log, err) {
			return nil, false
		}

		s.log.Debugf("Setting up consumer for buffer topic: %v", tn)

		//Consumer MUST be created before snapshotting the table.
		//Creating it after may leave a gap in events stream.
		consumer, err = s.inPipe.NewConsumer(tn)
		if log.EL(s.log, err) {
			return nil, false
		}

	}

	var gtid string
	gtid, _, err = s.ensureChangelogReaderStart()
	if log.E(err) {
		if consumer != nil {
			log.E(consumer.CloseOnFailure())
		}
		return nil, false
	}

	if s.row.NeedSnapshot && !s.row.Params.NoSnapshot && !s.waitForGtid(s.row.Service, s.row.Cluster, s.row.DB, s.row.Input, gtid) {
		if consumer != nil {
			log.E(consumer.CloseOnFailure())
		}
		return nil, false
	}

	return consumer, true
}