func()

in pkg/filter/http/grpcproxy/grpc.go [177:298]


func (f *Filter) Decode(c *http.HttpContext) filter.FilterStatus {
	svc, mth := getServiceAndMethod(c.GetUrl())

	var clientConn *grpc.ClientConn
	var err error

	re := c.GetRouteEntry()
	logger.Debugf("%s client choose endpoint from cluster :%v", loggerHeader, re.Cluster)

	e := server.GetClusterManager().PickEndpoint(re.Cluster, c)
	if e == nil {
		logger.Errorf("%s err {cluster not exists}", loggerHeader)
		c.SendLocalReply(stdHttp.StatusServiceUnavailable, []byte("cluster not exists"))
		return filter.Stop
	}
	// timeout for Dial and Invoke
	ctx, cancel := context.WithTimeout(c.Ctx, c.Timeout)
	defer cancel()
	ep := e.Address.GetAddress()

	p, ok := f.pools[strings.Join([]string{re.Cluster, ep}, ".")]
	if !ok {
		p = &sync.Pool{}
	}

	clientConn, ok = p.Get().(*grpc.ClientConn)
	if !ok || clientConn == nil {
		// TODO(Kenway): Support Credential and TLS
		clientConn, err = grpc.DialContext(ctx, ep, grpc.WithTransportCredentials(insecure.NewCredentials()))
		if err != nil || clientConn == nil {
			logger.Errorf("%s err {failed to connect to grpc service provider}", loggerHeader)
			c.SendLocalReply(stdHttp.StatusServiceUnavailable, []byte((fmt.Sprintf("%s", err))))
			return filter.Stop
		}
	}

	// get DescriptorSource, contain file and reflection
	source, err := f.descriptor.getDescriptorSource(context.WithValue(ctx, ct.ContextKey(GrpcClientConnKey), clientConn), f.cfg)
	if err != nil {
		logger.Errorf("%s err %s : %s ", loggerHeader, "get desc source fail", err)
		c.SendLocalReply(stdHttp.StatusInternalServerError, []byte("service not config proto file or the server not support reflection API"))
		return filter.Stop
	}
	//put DescriptorSource concurrent, del if no need
	ctx = context.WithValue(ctx, ct.ContextKey(DescriptorSourceKey), source)

	dscp, err := source.FindSymbol(svc)
	if err != nil {
		logger.Errorf("%s err {%s}", loggerHeader, "request path invalid")
		c.SendLocalReply(stdHttp.StatusBadRequest, []byte("method not allow"))
		return filter.Stop
	}

	svcDesc, ok := dscp.(*desc.ServiceDescriptor)
	if !ok {
		logger.Errorf("%s err {service not expose, %s}", loggerHeader, svc)
		c.SendLocalReply(stdHttp.StatusBadRequest, []byte(fmt.Sprintf("service not expose, %s", svc)))
		return filter.Stop
	}

	mthDesc := svcDesc.FindMethodByName(mth)

	err = f.registerExtension(source, mthDesc)
	if err != nil {
		logger.Errorf("%s err {%s}", loggerHeader, "register extension failed")
		c.SendLocalReply(stdHttp.StatusInternalServerError, []byte(fmt.Sprintf("%s", err)))
		return filter.Stop
	}

	msgFac := dynamic.NewMessageFactoryWithExtensionRegistry(f.extReg)
	grpcReq := msgFac.NewMessage(mthDesc.GetInputType())

	err = jsonToProtoMsg(c.Request.Body, grpcReq)
	if err != nil && !errors.Is(err, io.EOF) {
		logger.Errorf("%s err {failed to convert json to proto msg, %s}", loggerHeader, err.Error())
		c.SendLocalReply(stdHttp.StatusInternalServerError, []byte(fmt.Sprintf("%s", err)))
		return filter.Stop
	}

	stub := grpcdynamic.NewStubWithMessageFactory(clientConn, msgFac)

	// metadata in grpc has the same feature in http
	md := mapHeaderToMetadata(c.AllHeaders())
	ctx = metadata.NewOutgoingContext(ctx, md)

	md = metadata.MD{}
	t := metadata.MD{}

	resp, err := Invoke(ctx, stub, mthDesc, grpcReq, grpc.Header(&md), grpc.Trailer(&t))
	// judge err is server side error or not
	if st, ok := status.FromError(err); !ok || isServerError(st) {
		if isServerTimeout(st) {
			logger.Errorf("%s err {failed to invoke grpc service provider because timeout, err:%s}", loggerHeader, err.Error())
			c.SendLocalReply(stdHttp.StatusGatewayTimeout, []byte(fmt.Sprintf("%s", err)))
			return filter.Stop
		}
		logger.Errorf("%s err {failed to invoke grpc service provider, %s}", loggerHeader, err.Error())
		c.SendLocalReply(stdHttp.StatusServiceUnavailable, []byte(fmt.Sprintf("%s", err)))
		return filter.Stop
	}

	res, err := protoMsgToJson(resp)
	if err != nil {
		logger.Errorf("%s err {failed to convert proto msg to json, %s}", loggerHeader, err.Error())
		c.SendLocalReply(stdHttp.StatusInternalServerError, []byte(fmt.Sprintf("%s", err)))
		return filter.Stop
	}

	h := mapMetadataToHeader(md)
	th := mapMetadataToHeader(t)

	// let response filter handle resp
	c.SourceResp = &stdHttp.Response{
		StatusCode: stdHttp.StatusOK,
		Header:     h,
		Body:       io.NopCloser(strings.NewReader(res)),
		Trailer:    th,
		Request:    c.Request,
	}
	p.Put(clientConn)
	return filter.Continue
}