in streamer/streamer.go [105:157]
func (s *Streamer) waitForGtid(svc string, cluster string, sdb string, inputType string, gtid string) bool {
var current mysql.GTIDSet
s.log.Debugf("Waiting for snapshot server to catch up to: %v", gtid)
target, err := mysql.ParseGTIDSet("mysql", gtid)
if log.EL(s.log, err) {
return false
}
conn, err := db.OpenService(&db.Loc{Service: svc, Cluster: cluster, Name: sdb}, "", inputType)
if log.EL(s.log, err) {
return false
}
defer func() { log.EL(s.log, conn.Close()) }()
tickCheck := time.NewTicker(5 * time.Second)
defer tickCheck.Stop()
tickLock := time.NewTicker(s.stateUpdateInterval)
defer tickLock.Stop()
toCh := time.After(waitForGtidTimeout)
for {
err = conn.QueryRow("SELECT @@global.gtid_executed").Scan(>id)
if log.EL(s.log, err) {
return false
}
current, err = mysql.ParseGTIDSet("mysql", gtid)
if log.EL(s.log, err) {
return false
}
if current.Contain(target) {
break
}
select {
case <-toCh:
s.log.WithFields(log.Fields{"server has": current, "we need": target.String()}).Errorf("Timeout waiting snapshot server to catch up")
return false
case <-tickLock.C:
if !state.RefreshTableLock(s.row.ID, s.workerID) || !s.clusterLock.Refresh() {
s.log.Debugf("Lost the lock while waiting for gtid")
return false
}
s.log.WithFields(log.Fields{"server has": current, "we need": target.String()}).Errorf("Still waiting snapshot server to catch up")
case <-shutdown.InitiatedCh():
return false
case <-tickCheck.C:
}
}
s.log.Debugf("Snapshot server at: %v", current)
return true
}