func insertNewSchema()

in state/state.go [485:547]


func insertNewSchema(tx *sql.Tx, newGTID string, t *types.TableLoc, regid int64, format, params, rawSchema string, s *types.TableSchema) (int64, error) {
	log.Debugf("Inserting schema for table %+v", t)

	if params == "" {
		params = "{}"
	}

	var cnt int64

	//Guarantee that row exists for given cluster
	if err := util.ExecSQL(mgr.conn, "INSERT IGNORE INTO cluster_state(cluster,gtid) VALUES (?,'')", t.Cluster); log.E(err) {
		return 0, err
	}

	//Lock the cluster row
	var unused string
	if err := tx.QueryRow("SELECT gtid FROM cluster_state WHERE cluster=? FOR UPDATE", t.Cluster).Scan(&unused); log.E(err) {
		return 0, err
	}

	if err := tx.QueryRow("SELECT COUNT(*) FROM state WHERE cluster=? AND deleted_at IS NULL", t.Cluster).Scan(&cnt); log.E(err) {
		return 0, err
	}

	//First table re-registered for the cluster clear old state
	if cnt == 0 {
		if err := clearClusterState(tx, t.Cluster); err != nil {
			return 0, err
		}
	}

	insRes, err := tx.Exec("INSERT INTO state (reg_id,service,cluster,db,table_name,input,output,version,output_format,params) VALUES (?,?,?,?,?,?,?,?,?,?)", regid, t.Service, t.Cluster, t.DB, t.Table, t.Input, t.Output, t.Version, format, params)
	if err != nil {
		if !isDuplicateKeyErr(err) {
			log.E(err)
			return 0, err
		}

		//This is only safe todo when changelog reader is catched up
		id, err := replaceRawSchema(tx, "", newGTID, t, rawSchema, s)
		if err != nil {
			return 0, err
		}

		if err = undeleteStateRow(tx, id, format, params); log.E(err) {
			return 0, err
		}

		return id, nil
	}

	id, err := insRes.LastInsertId()
	if log.E(err) {
		return 0, err
	}

	if _, err = tx.Exec("INSERT INTO raw_schema(state_id,schema_gtid,raw_schema) VALUES (?,?,?)",
		id, newGTID, rawSchema); log.E(err) {
		return 0, err
	}

	return id, nil
}