func handleAlterTable()

in changelog/mysql.go [566:623]


func handleAlterTable(b *mysqlReader, qe *replication.QueryEvent, m [][]string) bool {
	//Make sure that we have up to date state before deciding whether this
	// alter table is for being ingested table or not
	if !b.updateState(false) {
		return false
	}
	dbname := m[0][1]
	table := m[0][2]
	if dbname == "" {
		dbname = util.BytesToString(qe.Schema)
	}
	d := b.tables[dbname]
	if d != nil && d[table] != nil {
		svc := d[table]
		b.log.WithFields(log.Fields{"db": dbname, "table": table, "alter": m[0][3]}).Debugf("detected alter statement of being ingested table")
		b.metrics.ChangelogAlterTableEvents.Inc(1)

		if strings.Contains(strings.ToLower(m[0][3]), " foreign key ") {
			log.Debugf("Skipping foreign key alter")
			return true
		}

		if injectAlterFailure {
			log.Errorf("Exited due to failure injection")
			return false
		}

		for _, tver := range svc {
			for i := range tver {
				t := tver[i]
				newGtid := util.SortedGTIDString(b.gtidSet)
				if !schema.MutateTable(state.GetNoDB(), t.service, dbname, table, m[0][3], t.encoder.Schema(), &t.rawSchema) ||
					!state.ReplaceSchema(t.service, b.dbl.Cluster, t.encoder.Schema(), t.rawSchema, t.schemaGtid, newGtid, b.inputType, t.output, t.version, t.outputFormat, t.params) {
					b.log.WithFields(log.Fields{"db": dbname, "table": table, "alter": m[0][3]}).Warnf("error executing alter table")
					return false
				}

				t.schemaGtid = newGtid

				err := t.encoder.UpdateCodec()
				if log.EL(b.log, err) {
					return false
				}
				log.Debugf("Updated codec. id=%v", t.id)
			}

			if !b.pushSchema(tver) {
				return false
			}

			b.metrics.ChangelogQueryEventsWritten.Inc(int64(len(tver)))
		}

		return true
	}
	b.log.WithFields(log.Fields{"query": util.BytesToString(qe.Query), "db": util.BytesToString(qe.Schema)}).Debugf("Unhandled query (alter). cluster: " + b.dbl.Cluster)
	return true
}