in pkg/datasource/sql/undo/base/undo.go [169:243]
func (m *BaseUndoLogManager) FlushUndoLog(tranCtx *types.TransactionContext, conn driver.Conn) error {
if tranCtx.RoundImages.IsEmpty() {
return nil
}
sqlUndoLogs := make([]undo.SQLUndoLog, 0)
beforeImages := tranCtx.RoundImages.BeofreImages()
afterImages := tranCtx.RoundImages.AfterImages()
if beforeImages.IsEmptyImage() && afterImages.IsEmptyImage() {
return nil
}
size := len(beforeImages)
if size < len(afterImages) {
size = len(afterImages)
}
for i := 0; i < size; i++ {
var (
tableName string
sqlType types.SQLType
beforeImage *types.RecordImage
afterImage *types.RecordImage
)
if i < len(beforeImages) && beforeImages[i] != nil {
tableName = beforeImages[i].TableName
sqlType = beforeImages[i].SQLType
} else if i < len(afterImages) && afterImages[i] != nil {
tableName = afterImages[i].TableName
sqlType = afterImages[i].SQLType
} else {
continue
}
if i < len(beforeImages) {
beforeImage = beforeImages[i]
}
if i < len(afterImages) {
afterImage = afterImages[i]
}
undoLog := undo.SQLUndoLog{
SQLType: sqlType,
TableName: tableName,
BeforeImage: beforeImage,
AfterImage: afterImage,
}
sqlUndoLogs = append(sqlUndoLogs, undoLog)
}
branchUndoLog := undo.BranchUndoLog{
Xid: tranCtx.XID,
BranchID: tranCtx.BranchID,
Logs: sqlUndoLogs,
}
parseContext := make(map[string]string, 0)
parseContext[serializerKey] = undo.UndoConfig.LogSerialization
parseContext[compressorTypeKey] = undo.UndoConfig.CompressConfig.Type
undoLogContent := m.encodeUndoLogCtx(parseContext)
rollbackInfo, err := m.serializeBranchUndoLog(&branchUndoLog, parseContext[serializerKey])
if err != nil {
return err
}
return m.InsertUndoLog(undo.UndologRecord{
BranchID: tranCtx.BranchID,
XID: tranCtx.XID,
Context: undoLogContent,
RollbackInfo: rollbackInfo,
LogStatus: undo.UndoLogStatueNormnal,
}, conn)
}