func()

in changelog/mysql.go [970:1050]


func (b *mysqlReader) start(cfg *config.AppConfig) bool {
	var err error

	h, _ := os.Hostname()
	w := log.GenWorkerID()
	b.workerID = h + "." + w

	b.dbl.Service, b.dbl.Cluster, b.dbl.Name, err = state.GetClusterTask(types.InputMySQL, b.workerID, cfg.LockExpireTimeout)
	if err != nil || b.dbl.Service == "" {
		return false
	}

	b.metrics = metrics.NewChangelogReaderMetrics(getTags(b.dbl.Cluster, b.inputType))

	b.metrics.NumWorkers.Inc()
	defer func() {
		if err != nil {
			b.metrics.Errors.Inc(1)
		}
		b.metrics.NumWorkers.Dec()
	}()

	thisInstanceCluster = b.dbl.Cluster

	b.log = log.WithFields(log.Fields{"worker_id": w, "cluster": b.dbl.Cluster})

	b.log.Infof("Starting MySQL binlog reader")

	// Get slave's connection info. Connecting to slave guarantees that we will connect to DC local slave
	b.masterCI, err = db.GetConnInfo(&b.dbl, db.Slave, b.inputType)
	if err != nil {
		return true
	}

	rf := b.binlogFormat()
	if rf != "ROW" {
		if rf != "" {
			b.log.Errorf("binlog format is %s. row binlog format required. skipping", rf)
		}
		return true
	}

	b.tables = make(map[string]map[string]map[string][]*table)

	for i := 0; i < len(queryHandlers); i++ {
		queryHandlers[i].compiled = regexp.MustCompile(queryHandlers[i].regexp)
	}

	// Start reading binlogs from the gtid set saved in the state
	var gtid string
	gtid, _, err = state.GetGTID(b.dbl.Cluster)
	if log.EL(b.log, err) {
		return true
	}

	/* If no gtid in the state get current gtid set from master */
	if gtid == "" {
		gtid, err = db.GetCurrentGTID(b.masterCI)
		if err != nil {
			return true
		}
		err = state.SetGTID(b.dbl.Cluster, gtid)
		if err != nil {
			b.log.Errorf("Error saving gtid. gtid %v. Error: %v", gtid, err.Error())
		}
	}

	var s mysql.GTIDSet
	s, err = mysql.ParseMysqlGTIDSet(gtid)
	if err != nil {
		b.log.Errorf("Invalid gtid: '%v' Error: %v", gtid, err.Error())
		return true
	}
	b.gtidSet = s.(*mysql.MysqlGTIDSet)

	b.readEvents(b.masterCI, cfg.StateUpdateInterval)

	b.log.Infof("MySQL Binlog reader finished")

	return true
}