func()

in pkg/datasource/sql/conn_xa.go [178:226]


func (c *XAConn) createNewTxOnExecIfNeed(ctx context.Context, f func() (types.ExecResult, error)) (types.ExecResult, error) {
	var (
		tx  driver.Tx
		err error
	)

	defer func() {
		recoverErr := recover()
		if err != nil || recoverErr != nil {
			log.Errorf("conn at rollback  error:%v or recoverErr:%v", err, recoverErr)
			if c.tx != nil {
				rollbackErr := c.tx.Rollback()
				if rollbackErr != nil {
					log.Errorf("conn at rollback error:%v", rollbackErr)
				}
			}
		}
	}()

	currentAutoCommit := c.autoCommit
	if c.txCtx.TransactionMode != types.Local && tm.IsGlobalTx(ctx) && c.autoCommit {
		tx, err = c.BeginTx(ctx, driver.TxOptions{Isolation: driver.IsolationLevel(gosql.LevelDefault)})
		if err != nil {
			return nil, err
		}
	}

	// execute SQL
	ret, err := f()
	if err != nil {
		// XA End & Rollback
		if rollbackErr := c.Rollback(ctx); rollbackErr != nil {
			log.Errorf("failed to rollback xa branch of :%s, err:%w", c.txCtx.XID, rollbackErr)
		}
		return nil, err
	}

	if tx != nil && currentAutoCommit {
		if err = c.Commit(ctx); err != nil {
			log.Errorf("xa connection proxy commit failure xid:%s, err:%v", c.txCtx.XID, err)
			// XA End & Rollback
			if err := c.Rollback(ctx); err != nil {
				log.Errorf("xa connection proxy rollback failure xid:%s, err:%v", c.txCtx.XID, err)
			}
		}
	}

	return ret, nil
}