in pkg/datasource/sql/undo/base/undo.go [251:381]
func (m *BaseUndoLogManager) Undo(ctx context.Context, dbType types.DBType, xid string, branchID int64, db *sql.DB, dbName string) (err error) {
conn, err := db.Conn(ctx)
if err != nil {
return err
}
tx, err := conn.BeginTx(ctx, &sql.TxOptions{})
if err != nil {
return err
}
defer func() {
if err != nil {
if err = tx.Rollback(); err != nil {
log.Errorf("rollback fail, xid: %s, branchID:%s err:%v", xid, branchID, err)
return
}
}
}()
stmt, err := conn.PrepareContext(ctx, getSelectUndoLogSql())
if err != nil {
log.Errorf("prepare sql fail, err: %v", err)
return err
}
defer func() {
if err = stmt.Close(); err != nil {
log.Errorf("stmt close fail, xid: %s, branchID:%s err:%v", xid, branchID, err)
return
}
}()
rows, err := stmt.Query(branchID, xid)
if err != nil {
log.Errorf("query sql fail, err: %v", err)
return err
}
defer func() {
if err = rows.Close(); err != nil {
log.Errorf("rows close fail, xid: %s, branchID:%s err:%v", xid, branchID, err)
return
}
}()
var undoLogRecords []undo.UndologRecord
for rows.Next() {
var record undo.UndologRecord
err = rows.Scan(&record.BranchID, &record.XID, &record.Context, &record.RollbackInfo, &record.LogStatus)
if err != nil {
return err
}
undoLogRecords = append(undoLogRecords, record)
}
if err := rows.Err(); err != nil {
log.Errorf("read rows next fail, xid: %s, branchID:%s err:%v", xid, branchID, err)
return err
}
var exists bool
for _, record := range undoLogRecords {
exists = true
if !record.CanUndo() {
log.Infof("xid %v branch %v, ignore %v undo_log", record.XID, record.BranchID, record.LogStatus)
return nil
}
var logCtx map[string]string
if record.Context != nil && string(record.Context) != "" {
logCtx = m.decodeUndoLogCtx(record.Context)
}
if logCtx == nil {
return fmt.Errorf("undo log context not exist in record %+v", record)
}
rollbackInfo, err := m.getRollbackInfo(record.RollbackInfo, logCtx)
if err != nil {
return err
}
var branchUndoLog *undo.BranchUndoLog
if branchUndoLog, err = m.deserializeBranchUndoLog(rollbackInfo, logCtx); err != nil {
return err
}
sqlUndoLogs := branchUndoLog.Logs
if len(sqlUndoLogs) == 0 {
return nil
}
branchUndoLog.Reverse()
for _, undoLog := range sqlUndoLogs {
tableMeta, err := datasource.GetTableCache(types.DBTypeMySQL).GetTableMeta(ctx, dbName, undoLog.TableName)
if err != nil {
log.Errorf("get table meta fail, err: %v", err)
return err
}
undoLog.SetTableMeta(tableMeta)
undoExecutor, err := factor.GetUndoExecutor(dbType, undoLog)
if err != nil {
log.Errorf("get undo executor, err: %v", err)
return err
}
if err = undoExecutor.ExecuteOn(ctx, dbType, conn); err != nil {
log.Errorf("execute on fail, err: %v", err)
return err
}
}
}
if exists {
if err = m.DeleteUndoLog(ctx, xid, branchID, conn); err != nil {
log.Errorf("[Undo] delete undo fail, err: %v", err)
return err
}
log.Infof("xid %v branch %v, undo_log deleted with %v", xid, branchID, undo.UndoLogStatueGlobalFinished)
} else {
if err = m.insertUndoLogWithGlobalFinished(ctx, xid, uint64(branchID), conn); err != nil {
log.Errorf("[Undo] insert undo with global finished fail, err: %v", err)
return err
}
log.Infof("xid %v branch %v, undo_log added with %v", xid, branchID, undo.UndoLogStatueGlobalFinished)
}
if err = tx.Commit(); err != nil {
log.Errorf("[Undo] execute on fail, err: %v", err)
return nil
}
return nil
}