in pkg/listener/tcp/server_handler.go [121:221]
func (h *ServerHandler) OnMessage(session getty.Session, pkg any) {
h.ls.gShutdownConfig.AddActiveCount(1)
defer h.ls.gShutdownConfig.AddActiveCount(-1)
h.rwlock.Lock()
if _, ok := h.sessionMap[session]; ok {
h.sessionMap[session].AddReqNum(1)
}
h.rwlock.Unlock()
decodeResult, drOK := pkg.(*remoting.DecodeResult)
if !drOK {
logger.Errorf("illegal package{%#v}", pkg)
return
}
if !decodeResult.IsRequest {
res := decodeResult.Result.(*remoting.Response)
if res.Event {
logger.Debugf("get rpc heartbeat response{%#v}", res)
if res.Error != nil {
logger.Errorf("rpc heartbeat response{error: %#v}", res.Error)
}
res.Handle()
return
}
logger.Errorf("illegal package but not heartbeat. {%#v}", pkg)
return
}
req := decodeResult.Result.(*remoting.Request)
resp := remoting.NewResponse(req.ID, req.Version)
resp.Status = hessian.Response_OK
resp.Event = req.Event
resp.SerialID = req.SerialID
resp.Version = "2.0.2"
// heartbeat
if req.Event {
logger.Debugf("get rpc heartbeat request{%#v}", resp)
reply(session, resp)
return
}
defer func() {
if e := recover(); e != nil {
resp.Status = hessian.Response_SERVER_ERROR
if err, ok := e.(error); ok {
logger.Errorf("OnMessage panic: %+v", perrors.WithStack(err))
resp.Error = perrors.WithStack(err)
} else if err, ok := e.(string); ok {
logger.Errorf("OnMessage panic: %+v", perrors.New(err))
resp.Error = perrors.New(err)
} else {
logger.Errorf("OnMessage panic: %+v, this is impossible.", e)
resp.Error = fmt.Errorf("OnMessage panic unknow exception. %+v", e)
}
if !req.TwoWay {
return
}
reply(session, resp)
}
}()
if h.ls.gShutdownConfig.RejectRequest {
err := perrors.Errorf("Pixiu is preparing to close, reject all new requests")
resp.Result = protocol.RPCResult{
Err: err,
}
reply(session, resp)
return
}
invoc, ok := req.Data.(*invocation.RPCInvocation)
if !ok {
panic("create invocation occur some exception for the type is not suitable one.")
}
attachments := invoc.Attachments()
attachments["local-addr"] = session.LocalAddr()
attachments["remote-addr"] = session.RemoteAddr()
result, err := h.ls.FilterChain.OnData(invoc)
if err != nil {
resp.Error = fmt.Errorf("OnData panic unknow exception. %+v", err)
if !req.TwoWay {
return
}
reply(session, resp)
}
if !req.TwoWay {
return
}
if result != nil {
ptr, _ := result.(*protocol.RPCResult)
resp.Result = *ptr
} else {
resp.Result = nil
}
reply(session, resp)
}