func()

in pkg/datasource/sql/undo/base/undo.go [169:243]


func (m *BaseUndoLogManager) FlushUndoLog(tranCtx *types.TransactionContext, conn driver.Conn) error {
	if tranCtx.RoundImages.IsEmpty() {
		return nil
	}

	sqlUndoLogs := make([]undo.SQLUndoLog, 0)
	beforeImages := tranCtx.RoundImages.BeofreImages()
	afterImages := tranCtx.RoundImages.AfterImages()

	if beforeImages.IsEmptyImage() && afterImages.IsEmptyImage() {
		return nil
	}

	size := len(beforeImages)
	if size < len(afterImages) {
		size = len(afterImages)
	}

	for i := 0; i < size; i++ {
		var (
			tableName   string
			sqlType     types.SQLType
			beforeImage *types.RecordImage
			afterImage  *types.RecordImage
		)

		if i < len(beforeImages) && beforeImages[i] != nil {
			tableName = beforeImages[i].TableName
			sqlType = beforeImages[i].SQLType
		} else if i < len(afterImages) && afterImages[i] != nil {
			tableName = afterImages[i].TableName
			sqlType = afterImages[i].SQLType
		} else {
			continue
		}

		if i < len(beforeImages) {
			beforeImage = beforeImages[i]
		}
		if i < len(afterImages) {
			afterImage = afterImages[i]
		}

		undoLog := undo.SQLUndoLog{
			SQLType:     sqlType,
			TableName:   tableName,
			BeforeImage: beforeImage,
			AfterImage:  afterImage,
		}
		sqlUndoLogs = append(sqlUndoLogs, undoLog)
	}

	branchUndoLog := undo.BranchUndoLog{
		Xid:      tranCtx.XID,
		BranchID: tranCtx.BranchID,
		Logs:     sqlUndoLogs,
	}

	parseContext := make(map[string]string, 0)
	parseContext[serializerKey] = undo.UndoConfig.LogSerialization
	parseContext[compressorTypeKey] = undo.UndoConfig.CompressConfig.Type
	undoLogContent := m.encodeUndoLogCtx(parseContext)
	rollbackInfo, err := m.serializeBranchUndoLog(&branchUndoLog, parseContext[serializerKey])
	if err != nil {
		return err
	}

	return m.InsertUndoLog(undo.UndologRecord{
		BranchID:     tranCtx.BranchID,
		XID:          tranCtx.XID,
		Context:      undoLogContent,
		RollbackInfo: rollbackInfo,
		LogStatus:    undo.UndoLogStatueNormnal,
	}, conn)
}