func handleRenameTable()

in changelog/mysql.go [625:683]


func handleRenameTable(b *mysqlReader, qe *replication.QueryEvent, m [][]string) bool {
	s := util.BytesToString(qe.Query)
	renameRE := regexp.MustCompile(`(?i)(^\s*(?:/\*[^\*]*\*/)?\s*rename\s+table\s+)`)
	r := renameRE.FindAllStringSubmatch(s, -1)
	if len(r) == 0 || len(m) < 2 {
		b.log.WithFields(log.Fields{"query": util.BytesToString(qe.Query), "db": util.BytesToString(qe.Schema)}).Debugf("Unhandled query (rename). cluster: " + b.dbl.Cluster)
		return true
	}
	for _, t := range m {
		dbname := t[4]
		table := t[5]
		if dbname == "" {
			dbname = util.BytesToString(qe.Schema)
		}
		d := b.tables[dbname]
		if d != nil && len(d[table]) > 0 {
			svc := d[table]
			b.log.WithFields(log.Fields{"db": dbname, "table": table, "rename": t[0]}).Debugf("detected rename statement of being ingested table")
			b.metrics.ChangelogAlterTableEvents.Inc(1)

			for _, tver := range svc {
				newGtid := util.SortedGTIDString(b.gtidSet)
				ts, rs := state.PullCurrentSchema(&db.Loc{Service: tver[0].service, Cluster: b.dbl.Cluster, Name: dbname}, table, types.InputMySQL)
				if ts == nil {
					return false
				}

				for i := range tver {
					t := tver[i]
					t.rawSchema = rs
					ets := t.encoder.Schema()
					ets.DBName = ts.DBName
					ets.TableName = ts.TableName
					ets.Columns = ts.Columns
					if !state.ReplaceSchema(t.service, b.dbl.Cluster, t.encoder.Schema(), t.rawSchema, t.schemaGtid, newGtid, b.inputType, t.output, t.version, t.outputFormat, t.params) {
						return false
					}

					t.schemaGtid = newGtid

					err := t.encoder.UpdateCodec()
					if log.EL(b.log, err) {
						return false
					}
					log.Debugf("Updated codec. id=%v", t.id)
				}

				if !b.pushSchema(tver) {
					return false
				}

				b.metrics.ChangelogQueryEventsWritten.Inc(int64(len(tver)))
			}

			return true
		}
	}
	return true
}