func()

in pkg/filter/network/dubboproxy/filter/http/httpfilter.go [83:146]


func (f Filter) Handle(ctx *dubbo2.RpcContext) filter.FilterStatus {

	ra := ctx.Route
	clusterName := ra.Cluster
	clusterManager := server.GetClusterManager()
	endpoint := clusterManager.PickEndpoint(clusterName, ctx)
	if endpoint == nil {
		ctx.SetError(errors.Errorf("Requested dubbo rpc invocation endpoint not found"))
		return filter.Stop
	}

	var (
		req *http.Request
		err error
	)

	invoc := ctx.RpcInvocation
	// path's format /{service}/{method}
	interfaceKey, _ := invoc.GetAttachment(constant.InterfaceKey)
	// work when invocation is generic
	// when invocation is generic, there are three value in arguments. first is methodName, second is types, third is values
	methodName := invoc.Arguments()[0].(string)
	path := interfaceKey + "/" + methodName

	parsedURL := url.URL{
		Host:   endpoint.Address.GetAddress(),
		Scheme: "http",
		Path:   path,
	}

	body := invoc.Arguments()[2]
	b, _ := json.Marshal(body)
	req, err = http.NewRequest(http.MethodPost, parsedURL.String(), strings.NewReader(string(b)))
	if err != nil {
		err := errors.New(fmt.Sprintf("create new request failed: %v", err))
		ctx.SetError(err)
		return filter.Stop
	}

	versionKey, _ := invoc.GetAttachment(dubboConstant.VersionKey)
	groupKey, _ := invoc.GetAttachment(dubboConstant.GroupKey)

	req.Header.Set(constant.DubboHttpDubboVersion, "1.0.0")
	req.Header.Set(constant.DubboServiceProtocol, dubbo.DUBBO)
	req.Header.Set(constant.DubboServiceVersion, versionKey)
	req.Header.Set(constant.DubboGroup, groupKey)

	resp, err := (&http.Client{}).Do(req)
	if err != nil {
		ctx.SetError(err)
		return filter.Stop
	}

	if resp.StatusCode != http.StatusOK {
		ctx.SetError(errors.New(fmt.Sprintf("upstream http response status code %d", resp.StatusCode)))
		return filter.Stop
	}

	s, _ := io.ReadAll(resp.Body)
	result := &protocol.RPCResult{}
	result.Rest = string(s)
	ctx.SetResult(result)
	return filter.Continue
}