func()

in protocol/jsonrpc/server.go [80:193]


func (s *Server) handlePkg(conn net.Conn) {
	defer func() {
		if r := recover(); r != nil {
			logger.Warnf("connection{local:%v, remote:%v} panic error:%#v, debug stack:%s",
				conn.LocalAddr(), conn.RemoteAddr(), r, string(debug.Stack()))
		}

		conn.Close()
	}()

	setTimeout := func(conn net.Conn, timeout time.Duration) {
		t := time.Time{}
		if timeout > time.Duration(0) {
			t = time.Now().Add(timeout)
		}

		if err := conn.SetDeadline(t); err != nil {
			logger.Error("connection.SetDeadline(t:%v) = error:%v", t, err)
		}
	}

	sendErrorResp := func(header http.Header, body []byte) error {
		rsp := &http.Response{
			Header:        header,
			StatusCode:    500,
			ProtoMajor:    1,
			ProtoMinor:    1,
			ContentLength: int64(len(body)),
			Body:          io.NopCloser(bytes.NewReader(body)),
		}
		rsp.Header.Del("Content-Type")
		rsp.Header.Del("Content-Length")
		rsp.Header.Del("Timeout")

		rspBuf := bytes.NewBuffer(make([]byte, DefaultHTTPRspBufferSize))
		rspBuf.Reset()
		err := rsp.Write(rspBuf)
		if err != nil {
			return perrors.WithStack(err)
		}
		_, err = rspBuf.WriteTo(conn)
		return perrors.WithStack(err)
	}

	for {
		bufReader := bufio.NewReader(io.LimitReader(conn, MaxHeaderSize))
		if _, err := bufReader.Peek(1); err == io.EOF {
			return
		}
		r, err := http.ReadRequest(bufReader)
		if err != nil {
			logger.Warnf("[ReadRequest] error: %v", err)
			return
		}

		reqBody, err := io.ReadAll(r.Body)
		r.Body.Close()
		if err != nil {
			return
		}

		reqHeader := make(map[string]string)
		for k := range r.Header {
			reqHeader[k] = r.Header.Get(k)
		}
		reqHeader["Path"] = r.URL.Path[1:] // to get service name
		if r.URL.Path[0] != PathPrefix {
			reqHeader["Path"] = r.URL.Path
		}
		reqHeader["HttpMethod"] = r.Method

		httpTimeout := s.timeout
		contentType := reqHeader["Content-Type"]
		if contentType != "application/json" && contentType != "application/json-rpc" {
			setTimeout(conn, httpTimeout)
			r.Header.Set("Content-Type", "text/plain")
			if errRsp := sendErrorResp(r.Header, []byte(perrors.WithStack(err).Error())); errRsp != nil {
				logger.Warnf("sendErrorResp(header:%#v, error:%v) = error:%s",
					r.Header, perrors.WithStack(err), errRsp)
			}
			return
		}

		ctx := context.Background()

		spanCtx, err := opentracing.GlobalTracer().Extract(opentracing.HTTPHeaders,
			opentracing.HTTPHeadersCarrier(r.Header))
		if err == nil {
			ctx = context.WithValue(ctx, constant.TracingRemoteSpanCtx, spanCtx)
		}

		if len(reqHeader["Timeout"]) > 0 {
			timeout, err := time.ParseDuration(reqHeader["Timeout"])
			if err == nil {
				httpTimeout = timeout
				var cancel context.CancelFunc
				ctx, cancel = context.WithTimeout(ctx, httpTimeout)
				defer cancel()
			}
			delete(reqHeader, "Timeout")
		}
		setTimeout(conn, httpTimeout)

		if err := serveRequest(ctx, reqHeader, reqBody, conn); err != nil {
			if errRsp := sendErrorResp(r.Header, []byte(perrors.WithStack(err).Error())); errRsp != nil {
				logger.Warnf("sendErrorResp(header:%#v, error:%v) = error:%s",
					r.Header, perrors.WithStack(err), errRsp)
			}

			logger.Infof("Unexpected error serving request, closing socket: %v", err)
			return
		}
	}
}