in protocol/jsonrpc/server.go [270:377]
func serveRequest(ctx context.Context, header map[string]string, body []byte, conn net.Conn) error {
sendErrorResp := func(header map[string]string, body []byte) error {
rsp := &http.Response{
Header: make(http.Header),
StatusCode: 500,
ProtoMajor: 1,
ProtoMinor: 1,
ContentLength: int64(len(body)),
Body: ioutil.NopCloser(bytes.NewReader(body)),
}
rsp.Header.Del("Content-Type")
rsp.Header.Del("Content-Length")
rsp.Header.Del("Timeout")
for k, v := range header {
rsp.Header.Set(k, v)
}
rspBuf := bytes.NewBuffer(make([]byte, DefaultHTTPRspBufferSize))
rspBuf.Reset()
err := rsp.Write(rspBuf)
if err != nil {
return perrors.WithStack(err)
}
_, err = rspBuf.WriteTo(conn)
return perrors.WithStack(err)
}
sendResp := func(header map[string]string, body []byte) error {
rsp := &http.Response{
Header: make(http.Header),
StatusCode: 200,
ProtoMajor: 1,
ProtoMinor: 1,
ContentLength: int64(len(body)),
Body: ioutil.NopCloser(bytes.NewReader(body)),
}
rsp.Header.Del("Content-Type")
rsp.Header.Del("Content-Length")
rsp.Header.Del("Timeout")
for k, v := range header {
rsp.Header.Set(k, v)
}
rspBuf := bytes.NewBuffer(make([]byte, DefaultHTTPRspBufferSize))
rspBuf.Reset()
err := rsp.Write(rspBuf)
if err != nil {
return perrors.WithStack(err)
}
_, err = rspBuf.WriteTo(conn)
return perrors.WithStack(err)
}
// read request header
codec := newServerCodec()
err := codec.ReadHeader(header, body)
if err != nil {
if err == io.EOF || err == io.ErrUnexpectedEOF {
return perrors.WithStack(err)
}
return perrors.New("server cannot decode request: " + err.Error())
}
path := header["Path"]
methodName := codec.req.Method
if len(path) == 0 || len(methodName) == 0 {
return perrors.New("service/method request ill-formed: " + path + "/" + methodName)
}
// read body
var args []interface{}
if err = codec.ReadBody(&args); err != nil {
return perrors.WithStack(err)
}
logger.Debugf("args: %v", args)
// exporter invoke
exporter, _ := jsonrpcProtocol.ExporterMap().Load(path)
invoker := exporter.(*JsonrpcExporter).GetInvoker()
if invoker != nil {
result := invoker.Invoke(ctx, invocation.NewRPCInvocation(methodName, args, map[string]interface{}{
constant.PathKey: path,
constant.VersionKey: codec.req.Version,
}))
if err := result.Error(); err != nil {
rspStream, codecErr := codec.Write(err.Error(), invalidRequest)
if codecErr != nil {
return perrors.WithStack(codecErr)
}
if errRsp := sendErrorResp(header, rspStream); errRsp != nil {
logger.Warnf("Exporter: sendErrorResp(header:%#v, error:%v) = error:%s",
header, err, errRsp)
}
} else {
res := result.Result()
rspStream, err := codec.Write("", res)
if err != nil {
return perrors.WithStack(err)
}
if errRsp := sendResp(header, rspStream); errRsp != nil {
logger.Warnf("Exporter: sendResp(header:%#v, error:%v) = error:%s",
header, err, errRsp)
}
}
}
return nil
}