func()

in streamer/streamer.go [105:157]


func (s *Streamer) waitForGtid(svc string, cluster string, sdb string, inputType string, gtid string) bool {
	var current mysql.GTIDSet

	s.log.Debugf("Waiting for snapshot server to catch up to: %v", gtid)

	target, err := mysql.ParseGTIDSet("mysql", gtid)
	if log.EL(s.log, err) {
		return false
	}

	conn, err := db.OpenService(&db.Loc{Service: svc, Cluster: cluster, Name: sdb}, "", inputType)
	if log.EL(s.log, err) {
		return false
	}
	defer func() { log.EL(s.log, conn.Close()) }()

	tickCheck := time.NewTicker(5 * time.Second)
	defer tickCheck.Stop()
	tickLock := time.NewTicker(s.stateUpdateInterval)
	defer tickLock.Stop()
	toCh := time.After(waitForGtidTimeout)
	for {
		err = conn.QueryRow("SELECT @@global.gtid_executed").Scan(&gtid)
		if log.EL(s.log, err) {
			return false
		}
		current, err = mysql.ParseGTIDSet("mysql", gtid)
		if log.EL(s.log, err) {
			return false
		}
		if current.Contain(target) {
			break
		}
		select {
		case <-toCh:
			s.log.WithFields(log.Fields{"server has": current, "we need": target.String()}).Errorf("Timeout waiting snapshot server to catch up")
			return false
		case <-tickLock.C:
			if !state.RefreshTableLock(s.row.ID, s.workerID) || !s.clusterLock.Refresh() {
				s.log.Debugf("Lost the lock while waiting for gtid")
				return false
			}
			s.log.WithFields(log.Fields{"server has": current, "we need": target.String()}).Errorf("Still waiting snapshot server to catch up")
		case <-shutdown.InitiatedCh():
			return false
		case <-tickCheck.C:
		}
	}

	s.log.Debugf("Snapshot server at: %v", current)

	return true
}