func()

in pkg/datasource/sql/undo/base/undo.go [251:381]


func (m *BaseUndoLogManager) Undo(ctx context.Context, dbType types.DBType, xid string, branchID int64, db *sql.DB, dbName string) (err error) {
	conn, err := db.Conn(ctx)
	if err != nil {
		return err
	}

	tx, err := conn.BeginTx(ctx, &sql.TxOptions{})
	if err != nil {
		return err
	}
	defer func() {
		if err != nil {
			if err = tx.Rollback(); err != nil {
				log.Errorf("rollback fail, xid: %s, branchID:%s err:%v", xid, branchID, err)
				return
			}
		}
	}()

	stmt, err := conn.PrepareContext(ctx, getSelectUndoLogSql())
	if err != nil {
		log.Errorf("prepare sql fail, err: %v", err)
		return err
	}
	defer func() {
		if err = stmt.Close(); err != nil {
			log.Errorf("stmt close fail, xid: %s, branchID:%s err:%v", xid, branchID, err)
			return
		}
	}()

	rows, err := stmt.Query(branchID, xid)
	if err != nil {
		log.Errorf("query sql fail, err: %v", err)
		return err
	}
	defer func() {
		if err = rows.Close(); err != nil {
			log.Errorf("rows close fail, xid: %s, branchID:%s err:%v", xid, branchID, err)
			return
		}
	}()

	var undoLogRecords []undo.UndologRecord
	for rows.Next() {
		var record undo.UndologRecord
		err = rows.Scan(&record.BranchID, &record.XID, &record.Context, &record.RollbackInfo, &record.LogStatus)
		if err != nil {
			return err
		}
		undoLogRecords = append(undoLogRecords, record)
	}
	if err := rows.Err(); err != nil {
		log.Errorf("read rows next fail, xid: %s, branchID:%s err:%v", xid, branchID, err)
		return err
	}
	var exists bool
	for _, record := range undoLogRecords {
		exists = true
		if !record.CanUndo() {
			log.Infof("xid %v branch %v, ignore %v undo_log", record.XID, record.BranchID, record.LogStatus)
			return nil
		}

		var logCtx map[string]string
		if record.Context != nil && string(record.Context) != "" {
			logCtx = m.decodeUndoLogCtx(record.Context)
		}

		if logCtx == nil {
			return fmt.Errorf("undo log context not exist in record %+v", record)
		}

		rollbackInfo, err := m.getRollbackInfo(record.RollbackInfo, logCtx)
		if err != nil {
			return err
		}

		var branchUndoLog *undo.BranchUndoLog
		if branchUndoLog, err = m.deserializeBranchUndoLog(rollbackInfo, logCtx); err != nil {
			return err
		}

		sqlUndoLogs := branchUndoLog.Logs
		if len(sqlUndoLogs) == 0 {
			return nil
		}
		branchUndoLog.Reverse()

		for _, undoLog := range sqlUndoLogs {
			tableMeta, err := datasource.GetTableCache(types.DBTypeMySQL).GetTableMeta(ctx, dbName, undoLog.TableName)
			if err != nil {
				log.Errorf("get table meta fail, err: %v", err)
				return err
			}

			undoLog.SetTableMeta(tableMeta)

			undoExecutor, err := factor.GetUndoExecutor(dbType, undoLog)
			if err != nil {
				log.Errorf("get undo executor, err: %v", err)
				return err
			}

			if err = undoExecutor.ExecuteOn(ctx, dbType, conn); err != nil {
				log.Errorf("execute on fail, err: %v", err)
				return err
			}
		}
	}

	if exists {
		if err = m.DeleteUndoLog(ctx, xid, branchID, conn); err != nil {
			log.Errorf("[Undo] delete undo fail, err: %v", err)
			return err
		}
		log.Infof("xid %v branch %v, undo_log deleted with %v", xid, branchID, undo.UndoLogStatueGlobalFinished)
	} else {
		if err = m.insertUndoLogWithGlobalFinished(ctx, xid, uint64(branchID), conn); err != nil {
			log.Errorf("[Undo] insert undo with global finished fail, err: %v", err)
			return err
		}
		log.Infof("xid %v branch %v, undo_log added with %v", xid, branchID, undo.UndoLogStatueGlobalFinished)
	}

	if err = tx.Commit(); err != nil {
		log.Errorf("[Undo] execute on fail, err: %v", err)
		return nil
	}
	return nil
}