pkg/remoting/getty/getty_remoting.go (102 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package getty import ( "fmt" "sync" "time" getty "github.com/apache/dubbo-getty" "seata.apache.org/seata-go/pkg/protocol/message" "seata.apache.org/seata-go/pkg/remoting/rpc" "seata.apache.org/seata-go/pkg/util/log" ) const ( RpcRequestTimeout = 20 * time.Second ) type ( callbackMethod func(reqMsg message.RpcMessage, respMsg *message.MessageFuture) (interface{}, error) GettyRemoting struct { futures *sync.Map mergeMsgMap *sync.Map } ) func newGettyRemoting() *GettyRemoting { return &GettyRemoting{ futures: &sync.Map{}, mergeMsgMap: &sync.Map{}, } } func (g *GettyRemoting) SendSync(msg message.RpcMessage, s getty.Session, callback callbackMethod) (interface{}, error) { if s == nil { s = sessionManager.selectSession(msg) } rpc.BeginCount(s.RemoteAddr()) result, err := g.sendAsync(s, msg, callback) rpc.EndCount(s.RemoteAddr()) if err != nil { log.Errorf("send message: %#v, session: %s", msg, s.Stat()) return nil, err } return result, err } func (g *GettyRemoting) SendAsync(msg message.RpcMessage, s getty.Session, callback callbackMethod) error { if s == nil { s = sessionManager.selectSession(msg) } rpc.BeginCount(s.RemoteAddr()) _, err := g.sendAsync(s, msg, callback) rpc.EndCount(s.RemoteAddr()) if err != nil { log.Errorf("send message: %#v, session: %s", msg, s.Stat()) } return err } func (g *GettyRemoting) sendAsync(session getty.Session, msg message.RpcMessage, callback callbackMethod) (interface{}, error) { if _, ok := msg.Body.(message.HeartBeatMessage); ok { log.Debug("send async message: {%#v}", msg) } else { log.Infof("send async message: {%#v}", msg) } var err error if session == nil || session.IsClosed() { log.Warn("sendAsyncRequestWithResponse nothing, caused by null channel.") return nil, fmt.Errorf("session is closed") } resp := message.NewMessageFuture(msg) g.futures.Store(msg.ID, resp) _, _, err = session.WritePkg(msg, time.Duration(0)) if err != nil { g.futures.Delete(msg.ID) log.Errorf("send message: %#v, session: %s", msg, session.Stat()) return nil, err } if callback != nil { return callback(msg, resp) } return nil, nil } func (g *GettyRemoting) GetMessageFuture(msgID int32) *message.MessageFuture { if msg, ok := g.futures.Load(msgID); ok { return msg.(*message.MessageFuture) } return nil } func (g *GettyRemoting) RemoveMessageFuture(msgID int32) { g.futures.Delete(msgID) } func (g *GettyRemoting) RemoveMergedMessageFuture(msgID int32) { g.mergeMsgMap.Delete(msgID) } func (g *GettyRemoting) GetMergedMessage(msgID int32) *message.MergedWarpMessage { if msg, ok := g.mergeMsgMap.Load(msgID); ok { return msg.(*message.MergedWarpMessage) } return nil } func (g *GettyRemoting) NotifyRpcMessageResponse(rpcMessage message.RpcMessage) { messageFuture := g.GetMessageFuture(rpcMessage.ID) if messageFuture != nil { messageFuture.Response = rpcMessage.Body // todo add messageFuture.Err // messageFuture.Err = rpcMessage.Err messageFuture.Done <- struct{}{} // client.msgFutures.Delete(rpcMessage.RequestID) } else { log.Infof("msg: {} is not found in msgFutures.", rpcMessage.ID) } }