in state/state.go [485:547]
func insertNewSchema(tx *sql.Tx, newGTID string, t *types.TableLoc, regid int64, format, params, rawSchema string, s *types.TableSchema) (int64, error) {
log.Debugf("Inserting schema for table %+v", t)
if params == "" {
params = "{}"
}
var cnt int64
//Guarantee that row exists for given cluster
if err := util.ExecSQL(mgr.conn, "INSERT IGNORE INTO cluster_state(cluster,gtid) VALUES (?,'')", t.Cluster); log.E(err) {
return 0, err
}
//Lock the cluster row
var unused string
if err := tx.QueryRow("SELECT gtid FROM cluster_state WHERE cluster=? FOR UPDATE", t.Cluster).Scan(&unused); log.E(err) {
return 0, err
}
if err := tx.QueryRow("SELECT COUNT(*) FROM state WHERE cluster=? AND deleted_at IS NULL", t.Cluster).Scan(&cnt); log.E(err) {
return 0, err
}
//First table re-registered for the cluster clear old state
if cnt == 0 {
if err := clearClusterState(tx, t.Cluster); err != nil {
return 0, err
}
}
insRes, err := tx.Exec("INSERT INTO state (reg_id,service,cluster,db,table_name,input,output,version,output_format,params) VALUES (?,?,?,?,?,?,?,?,?,?)", regid, t.Service, t.Cluster, t.DB, t.Table, t.Input, t.Output, t.Version, format, params)
if err != nil {
if !isDuplicateKeyErr(err) {
log.E(err)
return 0, err
}
//This is only safe todo when changelog reader is catched up
id, err := replaceRawSchema(tx, "", newGTID, t, rawSchema, s)
if err != nil {
return 0, err
}
if err = undeleteStateRow(tx, id, format, params); log.E(err) {
return 0, err
}
return id, nil
}
id, err := insRes.LastInsertId()
if log.E(err) {
return 0, err
}
if _, err = tx.Exec("INSERT INTO raw_schema(state_id,schema_gtid,raw_schema) VALUES (?,?,?)",
id, newGTID, rawSchema); log.E(err) {
return 0, err
}
return id, nil
}