func serveRequest()

in protocol/jsonrpc/server.go [274:381]


func serveRequest(ctx context.Context, header map[string]string, body []byte, conn net.Conn) error {
	sendErrorResp := func(header map[string]string, body []byte) error {
		rsp := &http.Response{
			Header:        make(http.Header),
			StatusCode:    500,
			ProtoMajor:    1,
			ProtoMinor:    1,
			ContentLength: int64(len(body)),
			Body:          io.NopCloser(bytes.NewReader(body)),
		}
		rsp.Header.Del("Content-Type")
		rsp.Header.Del("Content-Length")
		rsp.Header.Del("Timeout")
		for k, v := range header {
			rsp.Header.Set(k, v)
		}

		rspBuf := bytes.NewBuffer(make([]byte, DefaultHTTPRspBufferSize))
		rspBuf.Reset()
		err := rsp.Write(rspBuf)
		if err != nil {
			return perrors.WithStack(err)
		}
		_, err = rspBuf.WriteTo(conn)
		return perrors.WithStack(err)
	}

	sendResp := func(header map[string]string, body []byte) error {
		rsp := &http.Response{
			Header:        make(http.Header),
			StatusCode:    200,
			ProtoMajor:    1,
			ProtoMinor:    1,
			ContentLength: int64(len(body)),
			Body:          io.NopCloser(bytes.NewReader(body)),
		}
		rsp.Header.Del("Content-Type")
		rsp.Header.Del("Content-Length")
		rsp.Header.Del("Timeout")
		for k, v := range header {
			rsp.Header.Set(k, v)
		}

		rspBuf := bytes.NewBuffer(make([]byte, DefaultHTTPRspBufferSize))
		rspBuf.Reset()
		err := rsp.Write(rspBuf)
		if err != nil {
			return perrors.WithStack(err)
		}
		_, err = rspBuf.WriteTo(conn)
		return perrors.WithStack(err)
	}

	// read request header
	codec := newServerCodec()
	err := codec.ReadHeader(header, body)
	if err != nil {
		if err == io.EOF || err == io.ErrUnexpectedEOF {
			return perrors.WithStack(err)
		}
		return perrors.New("server cannot decode request: " + err.Error())
	}

	path := header["Path"]
	methodName := codec.req.Method
	if len(path) == 0 || len(methodName) == 0 {
		return perrors.New("service/method request ill-formed: " + path + "/" + methodName)
	}

	// read body
	var args []any
	if err = codec.ReadBody(&args); err != nil {
		return perrors.WithStack(err)
	}
	logger.Debugf("args: %v", args)

	// exporter invoke
	exporter, _ := jsonrpcProtocol.ExporterMap().Load(path)
	invoker := exporter.(*JsonrpcExporter).GetInvoker()
	if invoker != nil {
		result := invoker.Invoke(ctx, invocation.NewRPCInvocation(methodName, args, map[string]any{
			constant.PathKey:    path,
			constant.VersionKey: codec.req.Version,
		}))
		if err := result.Error(); err != nil {
			rspStream, codecErr := codec.Write(err.Error(), invalidRequest)
			if codecErr != nil {
				return perrors.WithStack(codecErr)
			}
			if errRsp := sendErrorResp(header, rspStream); errRsp != nil {
				logger.Warnf("Exporter: sendErrorResp(header:%#v, error:%v) = error:%s",
					header, err, errRsp)
			}
		} else {
			res := result.Result()
			rspStream, err := codec.Write("", res)
			if err != nil {
				return perrors.WithStack(err)
			}
			if errRsp := sendResp(header, rspStream); errRsp != nil {
				logger.Warnf("Exporter: sendResp(header:%#v, error:%v) = error:%s",
					header, err, errRsp)
			}
		}
	}

	return nil
}