in changelog/mysql.go [566:623]
func handleAlterTable(b *mysqlReader, qe *replication.QueryEvent, m [][]string) bool {
//Make sure that we have up to date state before deciding whether this
// alter table is for being ingested table or not
if !b.updateState(false) {
return false
}
dbname := m[0][1]
table := m[0][2]
if dbname == "" {
dbname = util.BytesToString(qe.Schema)
}
d := b.tables[dbname]
if d != nil && d[table] != nil {
svc := d[table]
b.log.WithFields(log.Fields{"db": dbname, "table": table, "alter": m[0][3]}).Debugf("detected alter statement of being ingested table")
b.metrics.ChangelogAlterTableEvents.Inc(1)
if strings.Contains(strings.ToLower(m[0][3]), " foreign key ") {
log.Debugf("Skipping foreign key alter")
return true
}
if injectAlterFailure {
log.Errorf("Exited due to failure injection")
return false
}
for _, tver := range svc {
for i := range tver {
t := tver[i]
newGtid := util.SortedGTIDString(b.gtidSet)
if !schema.MutateTable(state.GetNoDB(), t.service, dbname, table, m[0][3], t.encoder.Schema(), &t.rawSchema) ||
!state.ReplaceSchema(t.service, b.dbl.Cluster, t.encoder.Schema(), t.rawSchema, t.schemaGtid, newGtid, b.inputType, t.output, t.version, t.outputFormat, t.params) {
b.log.WithFields(log.Fields{"db": dbname, "table": table, "alter": m[0][3]}).Warnf("error executing alter table")
return false
}
t.schemaGtid = newGtid
err := t.encoder.UpdateCodec()
if log.EL(b.log, err) {
return false
}
log.Debugf("Updated codec. id=%v", t.id)
}
if !b.pushSchema(tver) {
return false
}
b.metrics.ChangelogQueryEventsWritten.Inc(int64(len(tver)))
}
return true
}
b.log.WithFields(log.Fields{"query": util.BytesToString(qe.Query), "db": util.BytesToString(qe.Schema)}).Debugf("Unhandled query (alter). cluster: " + b.dbl.Cluster)
return true
}