pkg/datasource/sql/xa/mysql_xa_connection.go (156 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package xa
import (
"context"
"database/sql/driver"
"errors"
"fmt"
"io"
"strings"
"time"
"seata.apache.org/seata-go/pkg/util/log"
)
type MysqlXAConn struct {
driver.Conn
}
func NewMysqlXaConn(conn driver.Conn) *MysqlXAConn {
return &MysqlXAConn{Conn: conn}
}
func (c *MysqlXAConn) Commit(ctx context.Context, xid string, onePhase bool) error {
log.Infof("xa branch commit, xid %s", xid)
var sb strings.Builder
sb.WriteString("XA COMMIT ")
sb.WriteString("'")
sb.WriteString(xid)
sb.WriteString("'")
if onePhase {
sb.WriteString(" ONE PHASE")
}
conn, _ := c.Conn.(driver.ExecerContext)
_, err := conn.ExecContext(ctx, sb.String(), nil)
if err != nil {
log.Errorf("xa branch commit failed, xid %s, err %v", xid, err)
}
return err
}
func (c *MysqlXAConn) End(ctx context.Context, xid string, flags int) error {
log.Infof("xa branch end, xid %s", xid)
var sb strings.Builder
sb.WriteString("XA END ")
sb.WriteString("'")
sb.WriteString(xid)
sb.WriteString("'")
switch flags {
case TMSuccess:
break
case TMSuspend:
sb.WriteString(" SUSPEND")
break
case TMFail:
break
default:
return errors.New("invalid arguments")
}
conn, _ := c.Conn.(driver.ExecerContext)
_, err := conn.ExecContext(ctx, sb.String(), nil)
if err != nil {
log.Errorf("xa branch end failed, xid %s, err %v", xid, err)
}
return err
}
func (c *MysqlXAConn) Forget(ctx context.Context, xid string) error {
// mysql doesn't support this
return errors.New("mysql doesn't support this")
}
func (c *MysqlXAConn) GetTransactionTimeout() time.Duration {
return 0
}
// IsSameRM is called to determine if the resource manager instance represented by the target object
// is the same as the resource manager instance represented by the parameter xares.
func (c *MysqlXAConn) IsSameRM(ctx context.Context, xares XAResource) bool {
// todo: the fn depends on the driver.Conn, but it doesn't support
return false
}
func (c *MysqlXAConn) XAPrepare(ctx context.Context, xid string) error {
log.Infof("xa branch prepare, xid %s", xid)
var sb strings.Builder
sb.WriteString("XA PREPARE ")
sb.WriteString("'")
sb.WriteString(xid)
sb.WriteString("'")
conn, _ := c.Conn.(driver.ExecerContext)
_, err := conn.ExecContext(ctx, sb.String(), nil)
if err != nil {
log.Errorf("xa branch prepare failed, xid %s, err %v", xid, err)
}
return err
}
// Recover Obtains a list of prepared transaction branches from a resource manager.
// The transaction manager calls this method during recovery to obtain the list of transaction branches
// that are currently in prepared or heuristically completed states.
func (c *MysqlXAConn) Recover(ctx context.Context, flag int) (xids []string, err error) {
startRscan := (flag & TMStartRScan) > 0
endRscan := (flag & TMEndRScan) > 0
if !startRscan && !endRscan && flag != TMNoFlags {
return nil, errors.New("invalid arguments")
}
if !startRscan {
return nil, nil
}
conn := c.Conn.(driver.QueryerContext)
res, err := conn.QueryContext(ctx, "XA RECOVER", nil)
if err != nil {
return nil, err
}
dest := make([]driver.Value, 4)
for true {
if err = res.Next(dest); err != nil {
if err == io.EOF {
return xids, nil
}
return nil, err
}
gtridAndbqual, ok := dest[3].(string)
if !ok {
return nil, errors.New("the protocol of XA RECOVER statement is error")
}
fmt.Printf("gtr: %v", gtridAndbqual)
xids = append(xids, string(gtridAndbqual))
}
return xids, err
}
func (c *MysqlXAConn) Rollback(ctx context.Context, xid string) error {
log.Infof("xa branch rollback, xid %s", xid)
var sb strings.Builder
sb.WriteString("XA ROLLBACK ")
sb.WriteString("'")
sb.WriteString(xid)
sb.WriteString("'")
conn, _ := c.Conn.(driver.ExecerContext)
_, err := conn.ExecContext(ctx, sb.String(), nil)
if err != nil {
log.Errorf("xa branch rollback failed, xid %s, err %v", xid, err)
}
return err
}
func (c *MysqlXAConn) SetTransactionTimeout(duration time.Duration) bool {
return false
}
func (c *MysqlXAConn) Start(ctx context.Context, xid string, flags int) error {
log.Infof("xa branch start, xid %s", xid)
var sb strings.Builder
sb.WriteString("XA START ")
sb.WriteString("'")
sb.WriteString(xid)
sb.WriteString("'")
switch flags {
case TMJoin:
sb.WriteString(" JOIN")
break
case TMResume:
sb.WriteString(" RESUME")
break
case TMNoFlags:
break
default:
return errors.New("invalid arguments")
}
conn, _ := c.Conn.(driver.ExecerContext)
_, err := conn.ExecContext(ctx, sb.String(), nil)
if err != nil {
log.Errorf("xa branch start failed, xid %s, err %v", xid, err)
}
return err
}