in pkg/filter/network/dubboproxy/filter/http/httpfilter.go [83:146]
func (f Filter) Handle(ctx *dubbo2.RpcContext) filter.FilterStatus {
ra := ctx.Route
clusterName := ra.Cluster
clusterManager := server.GetClusterManager()
endpoint := clusterManager.PickEndpoint(clusterName, ctx)
if endpoint == nil {
ctx.SetError(errors.Errorf("Requested dubbo rpc invocation endpoint not found"))
return filter.Stop
}
var (
req *http.Request
err error
)
invoc := ctx.RpcInvocation
// path's format /{service}/{method}
interfaceKey, _ := invoc.GetAttachment(constant.InterfaceKey)
// work when invocation is generic
// when invocation is generic, there are three value in arguments. first is methodName, second is types, third is values
methodName := invoc.Arguments()[0].(string)
path := interfaceKey + "/" + methodName
parsedURL := url.URL{
Host: endpoint.Address.GetAddress(),
Scheme: "http",
Path: path,
}
body := invoc.Arguments()[2]
b, _ := json.Marshal(body)
req, err = http.NewRequest(http.MethodPost, parsedURL.String(), strings.NewReader(string(b)))
if err != nil {
err := errors.New(fmt.Sprintf("create new request failed: %v", err))
ctx.SetError(err)
return filter.Stop
}
versionKey, _ := invoc.GetAttachment(dubboConstant.VersionKey)
groupKey, _ := invoc.GetAttachment(dubboConstant.GroupKey)
req.Header.Set(constant.DubboHttpDubboVersion, "1.0.0")
req.Header.Set(constant.DubboServiceProtocol, dubbo.DUBBO)
req.Header.Set(constant.DubboServiceVersion, versionKey)
req.Header.Set(constant.DubboGroup, groupKey)
resp, err := (&http.Client{}).Do(req)
if err != nil {
ctx.SetError(err)
return filter.Stop
}
if resp.StatusCode != http.StatusOK {
ctx.SetError(errors.New(fmt.Sprintf("upstream http response status code %d", resp.StatusCode)))
return filter.Stop
}
s, _ := io.ReadAll(resp.Body)
result := &protocol.RPCResult{}
result.Rest = string(s)
ctx.SetResult(result)
return filter.Continue
}