in pkg/filter/http/grpcproxy/grpc.go [177:298]
func (f *Filter) Decode(c *http.HttpContext) filter.FilterStatus {
svc, mth := getServiceAndMethod(c.GetUrl())
var clientConn *grpc.ClientConn
var err error
re := c.GetRouteEntry()
logger.Debugf("%s client choose endpoint from cluster :%v", loggerHeader, re.Cluster)
e := server.GetClusterManager().PickEndpoint(re.Cluster, c)
if e == nil {
logger.Errorf("%s err {cluster not exists}", loggerHeader)
c.SendLocalReply(stdHttp.StatusServiceUnavailable, []byte("cluster not exists"))
return filter.Stop
}
// timeout for Dial and Invoke
ctx, cancel := context.WithTimeout(c.Ctx, c.Timeout)
defer cancel()
ep := e.Address.GetAddress()
p, ok := f.pools[strings.Join([]string{re.Cluster, ep}, ".")]
if !ok {
p = &sync.Pool{}
}
clientConn, ok = p.Get().(*grpc.ClientConn)
if !ok || clientConn == nil {
// TODO(Kenway): Support Credential and TLS
clientConn, err = grpc.DialContext(ctx, ep, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil || clientConn == nil {
logger.Errorf("%s err {failed to connect to grpc service provider}", loggerHeader)
c.SendLocalReply(stdHttp.StatusServiceUnavailable, []byte((fmt.Sprintf("%s", err))))
return filter.Stop
}
}
// get DescriptorSource, contain file and reflection
source, err := f.descriptor.getDescriptorSource(context.WithValue(ctx, ct.ContextKey(GrpcClientConnKey), clientConn), f.cfg)
if err != nil {
logger.Errorf("%s err %s : %s ", loggerHeader, "get desc source fail", err)
c.SendLocalReply(stdHttp.StatusInternalServerError, []byte("service not config proto file or the server not support reflection API"))
return filter.Stop
}
//put DescriptorSource concurrent, del if no need
ctx = context.WithValue(ctx, ct.ContextKey(DescriptorSourceKey), source)
dscp, err := source.FindSymbol(svc)
if err != nil {
logger.Errorf("%s err {%s}", loggerHeader, "request path invalid")
c.SendLocalReply(stdHttp.StatusBadRequest, []byte("method not allow"))
return filter.Stop
}
svcDesc, ok := dscp.(*desc.ServiceDescriptor)
if !ok {
logger.Errorf("%s err {service not expose, %s}", loggerHeader, svc)
c.SendLocalReply(stdHttp.StatusBadRequest, []byte(fmt.Sprintf("service not expose, %s", svc)))
return filter.Stop
}
mthDesc := svcDesc.FindMethodByName(mth)
err = f.registerExtension(source, mthDesc)
if err != nil {
logger.Errorf("%s err {%s}", loggerHeader, "register extension failed")
c.SendLocalReply(stdHttp.StatusInternalServerError, []byte(fmt.Sprintf("%s", err)))
return filter.Stop
}
msgFac := dynamic.NewMessageFactoryWithExtensionRegistry(f.extReg)
grpcReq := msgFac.NewMessage(mthDesc.GetInputType())
err = jsonToProtoMsg(c.Request.Body, grpcReq)
if err != nil && !errors.Is(err, io.EOF) {
logger.Errorf("%s err {failed to convert json to proto msg, %s}", loggerHeader, err.Error())
c.SendLocalReply(stdHttp.StatusInternalServerError, []byte(fmt.Sprintf("%s", err)))
return filter.Stop
}
stub := grpcdynamic.NewStubWithMessageFactory(clientConn, msgFac)
// metadata in grpc has the same feature in http
md := mapHeaderToMetadata(c.AllHeaders())
ctx = metadata.NewOutgoingContext(ctx, md)
md = metadata.MD{}
t := metadata.MD{}
resp, err := Invoke(ctx, stub, mthDesc, grpcReq, grpc.Header(&md), grpc.Trailer(&t))
// judge err is server side error or not
if st, ok := status.FromError(err); !ok || isServerError(st) {
if isServerTimeout(st) {
logger.Errorf("%s err {failed to invoke grpc service provider because timeout, err:%s}", loggerHeader, err.Error())
c.SendLocalReply(stdHttp.StatusGatewayTimeout, []byte(fmt.Sprintf("%s", err)))
return filter.Stop
}
logger.Errorf("%s err {failed to invoke grpc service provider, %s}", loggerHeader, err.Error())
c.SendLocalReply(stdHttp.StatusServiceUnavailable, []byte(fmt.Sprintf("%s", err)))
return filter.Stop
}
res, err := protoMsgToJson(resp)
if err != nil {
logger.Errorf("%s err {failed to convert proto msg to json, %s}", loggerHeader, err.Error())
c.SendLocalReply(stdHttp.StatusInternalServerError, []byte(fmt.Sprintf("%s", err)))
return filter.Stop
}
h := mapMetadataToHeader(md)
th := mapMetadataToHeader(t)
// let response filter handle resp
c.SourceResp = &stdHttp.Response{
StatusCode: stdHttp.StatusOK,
Header: h,
Body: io.NopCloser(strings.NewReader(res)),
Trailer: th,
Request: c.Request,
}
p.Put(clientConn)
return filter.Continue
}