in changelog/mysql.go [313:392]
func (b *mysqlReader) reloadState() bool {
st, err := state.GetCond("cluster=? AND input='mysql'", b.dbl.Cluster)
if err != nil {
b.log.Errorf("Failed to read state, Error: %v", err.Error())
return false
}
b.log.Debugf("reloadState")
for i := 0; i < len(st); i++ {
t := &st[i]
if b.tables[t.DB] == nil {
b.tables[t.DB] = make(map[string]map[string][]*table)
}
if b.seqNo == 0 {
b.seqNo = t.SeqNo
}
if b.tables[t.DB][t.Table] == nil {
b.tables[t.DB][t.Table] = make(map[string][]*table)
}
if b.tables[t.DB][t.Table][t.Service] == nil {
b.tables[t.DB][t.Table][t.Service] = make([]*table, 0)
}
tver := b.tables[t.DB][t.Table][t.Service]
j := findInVersionArray(tver, t.Output, t.Version)
if j < len(tver) {
/* Table was deleted and inserted again. Reinitialize */
if tver[j].id != t.ID {
err = tver[j].producer.Close()
log.EL(b.log, err)
tver[j] = tver[len(tver)-1]
tver[len(tver)-1] = nil
b.tables[t.DB][t.Table][t.Service] = tver[:len(tver)-1]
} else {
tver[j].dead = false
if t.SnapshotTimeChanged(tver[j].snapshottedAt) {
pn, err := config.Get().GetChangelogTopicName(t.Service, t.DB, t.Table, t.Input, t.Output, t.Version, t.SnapshottedAt)
if log.EL(b.log, err) {
return false
}
err = tver[j].producer.Close()
log.EL(b.log, err)
tver[j].producer, err = b.createProducer(pn, t)
if log.EL(b.log, err) {
return false
}
tver[j].snapshottedAt = t.SnapshottedAt
}
}
} else if !b.addNewTable(t) {
return false
}
}
c := b.removeDeletedTables()
b.metrics.NumTablesIngesting.Set(int64(c))
if b.bufPipe.Type() == "local" && b.tpool != nil {
b.tpool.Adjust(c + 1)
}
if c == 0 {
log.Debugf("No tables remaining. Finish binlog reader")
return false
}
b.numTables = int(c)
return true
}