func()

in pkg/listener/tcp/server_handler.go [121:221]


func (h *ServerHandler) OnMessage(session getty.Session, pkg any) {
	h.ls.gShutdownConfig.AddActiveCount(1)
	defer h.ls.gShutdownConfig.AddActiveCount(-1)

	h.rwlock.Lock()
	if _, ok := h.sessionMap[session]; ok {
		h.sessionMap[session].AddReqNum(1)
	}
	h.rwlock.Unlock()

	decodeResult, drOK := pkg.(*remoting.DecodeResult)
	if !drOK {
		logger.Errorf("illegal package{%#v}", pkg)
		return
	}
	if !decodeResult.IsRequest {
		res := decodeResult.Result.(*remoting.Response)
		if res.Event {
			logger.Debugf("get rpc heartbeat response{%#v}", res)
			if res.Error != nil {
				logger.Errorf("rpc heartbeat response{error: %#v}", res.Error)
			}
			res.Handle()
			return
		}
		logger.Errorf("illegal package but not heartbeat. {%#v}", pkg)
		return
	}
	req := decodeResult.Result.(*remoting.Request)

	resp := remoting.NewResponse(req.ID, req.Version)
	resp.Status = hessian.Response_OK
	resp.Event = req.Event
	resp.SerialID = req.SerialID
	resp.Version = "2.0.2"

	// heartbeat
	if req.Event {
		logger.Debugf("get rpc heartbeat request{%#v}", resp)
		reply(session, resp)
		return
	}

	defer func() {
		if e := recover(); e != nil {
			resp.Status = hessian.Response_SERVER_ERROR
			if err, ok := e.(error); ok {
				logger.Errorf("OnMessage panic: %+v", perrors.WithStack(err))
				resp.Error = perrors.WithStack(err)
			} else if err, ok := e.(string); ok {
				logger.Errorf("OnMessage panic: %+v", perrors.New(err))
				resp.Error = perrors.New(err)
			} else {
				logger.Errorf("OnMessage panic: %+v, this is impossible.", e)
				resp.Error = fmt.Errorf("OnMessage panic unknow exception. %+v", e)
			}

			if !req.TwoWay {
				return
			}
			reply(session, resp)
		}
	}()

	if h.ls.gShutdownConfig.RejectRequest {
		err := perrors.Errorf("Pixiu is preparing to close, reject all new requests")
		resp.Result = protocol.RPCResult{
			Err: err,
		}
		reply(session, resp)
		return
	}

	invoc, ok := req.Data.(*invocation.RPCInvocation)
	if !ok {
		panic("create invocation occur some exception for the type is not suitable one.")
	}
	attachments := invoc.Attachments()
	attachments["local-addr"] = session.LocalAddr()
	attachments["remote-addr"] = session.RemoteAddr()

	result, err := h.ls.FilterChain.OnData(invoc)
	if err != nil {
		resp.Error = fmt.Errorf("OnData panic unknow exception. %+v", err)
		if !req.TwoWay {
			return
		}
		reply(session, resp)
	}

	if !req.TwoWay {
		return
	}
	if result != nil {
		ptr, _ := result.(*protocol.RPCResult)
		resp.Result = *ptr
	} else {
		resp.Result = nil
	}
	reply(session, resp)
}