in changelog/mysql.go [625:683]
func handleRenameTable(b *mysqlReader, qe *replication.QueryEvent, m [][]string) bool {
s := util.BytesToString(qe.Query)
renameRE := regexp.MustCompile(`(?i)(^\s*(?:/\*[^\*]*\*/)?\s*rename\s+table\s+)`)
r := renameRE.FindAllStringSubmatch(s, -1)
if len(r) == 0 || len(m) < 2 {
b.log.WithFields(log.Fields{"query": util.BytesToString(qe.Query), "db": util.BytesToString(qe.Schema)}).Debugf("Unhandled query (rename). cluster: " + b.dbl.Cluster)
return true
}
for _, t := range m {
dbname := t[4]
table := t[5]
if dbname == "" {
dbname = util.BytesToString(qe.Schema)
}
d := b.tables[dbname]
if d != nil && len(d[table]) > 0 {
svc := d[table]
b.log.WithFields(log.Fields{"db": dbname, "table": table, "rename": t[0]}).Debugf("detected rename statement of being ingested table")
b.metrics.ChangelogAlterTableEvents.Inc(1)
for _, tver := range svc {
newGtid := util.SortedGTIDString(b.gtidSet)
ts, rs := state.PullCurrentSchema(&db.Loc{Service: tver[0].service, Cluster: b.dbl.Cluster, Name: dbname}, table, types.InputMySQL)
if ts == nil {
return false
}
for i := range tver {
t := tver[i]
t.rawSchema = rs
ets := t.encoder.Schema()
ets.DBName = ts.DBName
ets.TableName = ts.TableName
ets.Columns = ts.Columns
if !state.ReplaceSchema(t.service, b.dbl.Cluster, t.encoder.Schema(), t.rawSchema, t.schemaGtid, newGtid, b.inputType, t.output, t.version, t.outputFormat, t.params) {
return false
}
t.schemaGtid = newGtid
err := t.encoder.UpdateCodec()
if log.EL(b.log, err) {
return false
}
log.Debugf("Updated codec. id=%v", t.id)
}
if !b.pushSchema(tver) {
return false
}
b.metrics.ChangelogQueryEventsWritten.Inc(int64(len(tver)))
}
return true
}
}
return true
}