in protocol/jsonrpc/server.go [79:189]
func (s *Server) handlePkg(conn net.Conn) {
defer func() {
if r := recover(); r != nil {
logger.Warnf("connection{local:%v, remote:%v} panic error:%#v, debug stack:%s",
conn.LocalAddr(), conn.RemoteAddr(), r, string(debug.Stack()))
}
conn.Close()
}()
setTimeout := func(conn net.Conn, timeout time.Duration) {
t := time.Time{}
if timeout > time.Duration(0) {
t = time.Now().Add(timeout)
}
if err := conn.SetDeadline(t); err != nil {
logger.Error("connection.SetDeadline(t:%v) = error:%v", t, err)
}
}
sendErrorResp := func(header http.Header, body []byte) error {
rsp := &http.Response{
Header: header,
StatusCode: 500,
ProtoMajor: 1,
ProtoMinor: 1,
ContentLength: int64(len(body)),
Body: ioutil.NopCloser(bytes.NewReader(body)),
}
rsp.Header.Del("Content-Type")
rsp.Header.Del("Content-Length")
rsp.Header.Del("Timeout")
rspBuf := bytes.NewBuffer(make([]byte, DefaultHTTPRspBufferSize))
rspBuf.Reset()
err := rsp.Write(rspBuf)
if err != nil {
return perrors.WithStack(err)
}
_, err = rspBuf.WriteTo(conn)
return perrors.WithStack(err)
}
for {
bufReader := bufio.NewReader(conn)
r, err := http.ReadRequest(bufReader)
if err != nil {
logger.Warnf("[ReadRequest] error: %v", err)
return
}
reqBody, err := ioutil.ReadAll(r.Body)
r.Body.Close()
if err != nil {
return
}
reqHeader := make(map[string]string)
for k := range r.Header {
reqHeader[k] = r.Header.Get(k)
}
reqHeader["Path"] = r.URL.Path[1:] // to get service name
if r.URL.Path[0] != PathPrefix {
reqHeader["Path"] = r.URL.Path
}
reqHeader["HttpMethod"] = r.Method
httpTimeout := s.timeout
contentType := reqHeader["Content-Type"]
if contentType != "application/json" && contentType != "application/json-rpc" {
setTimeout(conn, httpTimeout)
r.Header.Set("Content-Type", "text/plain")
if errRsp := sendErrorResp(r.Header, []byte(perrors.WithStack(err).Error())); errRsp != nil {
logger.Warnf("sendErrorResp(header:%#v, error:%v) = error:%s",
r.Header, perrors.WithStack(err), errRsp)
}
return
}
ctx := context.Background()
spanCtx, err := opentracing.GlobalTracer().Extract(opentracing.HTTPHeaders,
opentracing.HTTPHeadersCarrier(r.Header))
if err == nil {
ctx = context.WithValue(ctx, constant.TracingRemoteSpanCtx, spanCtx)
}
if len(reqHeader["Timeout"]) > 0 {
timeout, err := time.ParseDuration(reqHeader["Timeout"])
if err == nil {
httpTimeout = timeout
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, httpTimeout)
defer cancel()
}
delete(reqHeader, "Timeout")
}
setTimeout(conn, httpTimeout)
if err := serveRequest(ctx, reqHeader, reqBody, conn); err != nil {
if errRsp := sendErrorResp(r.Header, []byte(perrors.WithStack(err).Error())); errRsp != nil {
logger.Warnf("sendErrorResp(header:%#v, error:%v) = error:%s",
r.Header, perrors.WithStack(err), errRsp)
}
logger.Infof("Unexpected error serving request, closing socket: %v", err)
return
}
}
}