pkg/datasource/sql/undo/parser/parser_protobuf.go (186 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package parser
import (
"encoding/json"
"github.com/golang/protobuf/ptypes/any"
"github.com/golang/protobuf/ptypes/wrappers"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
"seata.apache.org/seata-go/pkg/datasource/sql/types"
"seata.apache.org/seata-go/pkg/datasource/sql/undo"
)
type ProtobufParser struct {
}
// GetName get the name of parser
func (p *ProtobufParser) GetName() string {
return "protobuf"
}
// GetDefaultContent get default content of this parser
func (p *ProtobufParser) GetDefaultContent() []byte {
return []byte{}
}
// Encode branch undo log to byte array
func (p *ProtobufParser) Encode(branchUndoLog *undo.BranchUndoLog) ([]byte, error) {
protoLog := ConvertToProto(branchUndoLog)
return proto.Marshal(protoLog)
}
// Decode byte array to branch undo log
func (p *ProtobufParser) Decode(data []byte) (*undo.BranchUndoLog, error) {
branchUndoLog := &BranchUndoLog{}
err := proto.Unmarshal(data, branchUndoLog)
if err != nil {
return nil, err
}
return ConvertToIntree(branchUndoLog), nil
}
func ConvertToProto(intreeLog *undo.BranchUndoLog) *BranchUndoLog {
protoLog := &BranchUndoLog{
Xid: intreeLog.Xid,
BranchID: intreeLog.BranchID,
Logs: []*SQLUndoLog{},
}
for _, undolog := range intreeLog.Logs {
protolog := &SQLUndoLog{
SQLType: SQLType(undolog.SQLType),
TableName: undolog.TableName,
}
if undolog.BeforeImage != nil {
protolog.BeforeImage = &RecordImage{
TableName: undolog.BeforeImage.TableName,
SQLType: SQLType(undolog.BeforeImage.SQLType),
Rows: []*RowImage{},
}
for _, row := range undolog.BeforeImage.Rows {
protoRow := &RowImage{
Columns: []*ColumnImage{},
}
for _, col := range row.Columns {
anyValue, err := convertInterfaceToAny(col.GetActualValue())
if err != nil {
continue
}
protoCol := &ColumnImage{
KeyType: IndexType(col.KeyType),
ColumnName: col.ColumnName,
ColumnType: JDBCType(col.ColumnType),
Value: anyValue,
}
protoRow.Columns = append(protoRow.Columns, protoCol)
}
protolog.BeforeImage.Rows = append(protolog.BeforeImage.Rows, protoRow)
}
}
if undolog.AfterImage != nil {
protolog.AfterImage = &RecordImage{
TableName: undolog.AfterImage.TableName,
SQLType: SQLType(undolog.AfterImage.SQLType),
Rows: []*RowImage{},
}
for _, row := range undolog.AfterImage.Rows {
protoRow := &RowImage{
Columns: []*ColumnImage{},
}
for _, col := range row.Columns {
anyValue, err := convertInterfaceToAny(col.Value)
if err != nil {
continue
}
protoCol := &ColumnImage{
KeyType: IndexType(col.KeyType),
ColumnName: col.ColumnName,
ColumnType: JDBCType(col.ColumnType),
Value: anyValue,
}
protoRow.Columns = append(protoRow.Columns, protoCol)
}
protolog.AfterImage.Rows = append(protolog.AfterImage.Rows, protoRow)
}
}
protoLog.Logs = append(protoLog.Logs, protolog)
}
return protoLog
}
func ConvertToIntree(protoLog *BranchUndoLog) *undo.BranchUndoLog {
intreeLog := &undo.BranchUndoLog{
Xid: protoLog.Xid,
BranchID: protoLog.BranchID,
Logs: []undo.SQLUndoLog{},
}
for _, pbSqlLog := range protoLog.Logs {
undoSqlLog := undo.SQLUndoLog{
SQLType: types.SQLType(pbSqlLog.SQLType),
TableName: pbSqlLog.TableName,
}
if pbSqlLog.BeforeImage != nil {
undoSqlLog.BeforeImage = &types.RecordImage{
TableName: pbSqlLog.BeforeImage.TableName,
SQLType: types.SQLType(pbSqlLog.BeforeImage.SQLType),
Rows: []types.RowImage{},
}
for _, pbRow := range pbSqlLog.BeforeImage.Rows {
undoRow := types.RowImage{
Columns: []types.ColumnImage{},
}
for _, pbCol := range pbRow.Columns {
anyValue, err := convertAnyToInterface(pbCol.Value)
if err != nil {
continue
}
undoCol := types.ColumnImage{
KeyType: types.IndexType(pbCol.KeyType),
ColumnName: pbCol.ColumnName,
ColumnType: types.JDBCType(pbCol.ColumnType),
Value: anyValue,
}
undoRow.Columns = append(undoRow.Columns, undoCol)
}
undoSqlLog.BeforeImage.Rows = append(undoSqlLog.BeforeImage.Rows, undoRow)
}
}
if pbSqlLog.AfterImage != nil {
undoSqlLog.AfterImage = &types.RecordImage{
TableName: pbSqlLog.AfterImage.TableName,
SQLType: types.SQLType(pbSqlLog.AfterImage.SQLType),
Rows: []types.RowImage{},
}
for _, pbRow := range pbSqlLog.AfterImage.Rows {
undoRow := types.RowImage{
Columns: []types.ColumnImage{},
}
for _, pbCol := range pbRow.Columns {
anyValue, err := convertAnyToInterface(pbCol.Value)
if err != nil {
continue
}
undoCol := types.ColumnImage{
KeyType: types.IndexType(pbCol.KeyType),
ColumnName: pbCol.ColumnName,
ColumnType: types.JDBCType(pbCol.ColumnType),
Value: anyValue,
}
undoRow.Columns = append(undoRow.Columns, undoCol)
}
undoSqlLog.AfterImage.Rows = append(undoSqlLog.AfterImage.Rows, undoRow)
}
}
intreeLog.Logs = append(intreeLog.Logs, undoSqlLog)
}
return intreeLog
}
func convertAnyToInterface(anyValue *any.Any) (interface{}, error) {
var value interface{}
bytesValue := &wrappers.BytesValue{}
err := anypb.UnmarshalTo(anyValue, bytesValue, proto.UnmarshalOptions{})
if err != nil {
return value, err
}
uErr := json.Unmarshal(bytesValue.Value, &value)
if uErr != nil {
return value, uErr
}
return value, nil
}
func convertInterfaceToAny(v interface{}) (*any.Any, error) {
anyValue := &any.Any{}
bytes, _ := json.Marshal(v)
bytesValue := &wrappers.BytesValue{
Value: bytes,
}
err := anypb.MarshalFrom(anyValue, bytesValue, proto.MarshalOptions{})
return anyValue, err
}