func()

in pkg/filter/network/dubboproxy/filter/proxy/proxyfilter.go [93:147]


func (f Filter) sendDubboRequest(ctx *dubbo2.RpcContext) filter.FilterStatus {

	ra := ctx.Route
	clusterName := ra.Cluster
	clusterManager := server.GetClusterManager()
	endpoint := clusterManager.PickEndpoint(clusterName, ctx)
	if endpoint == nil {
		ctx.SetError(errors.Errorf("Requested dubbo rpc invocation endpoint not found"))
		return filter.Stop
	}

	invoc := ctx.RpcInvocation
	interfaceKey, _ := invoc.GetAttachment(dubboConstant.InterfaceKey)
	groupKey, _ := invoc.GetAttachment(dubboConstant.GroupKey)
	versionKey, _ := invoc.GetAttachment(dubboConstant.VersionKey)

	url, err := common.NewURL(endpoint.Address.GetAddress(),
		common.WithProtocol(dubbo.DUBBO), common.WithParamsValue(dubboConstant.SerializationKey, dubboConstant.Hessian2Serialization),
		common.WithParamsValue(dubboConstant.GenericFilterKey, "true"),
		common.WithParamsValue(dubboConstant.InterfaceKey, interfaceKey),
		common.WithParamsValue(dubboConstant.ReferenceFilterKey, "generic,filter"),
		// dubboAttachment must contains group and version info
		common.WithParamsValue(dubboConstant.GroupKey, groupKey),
		common.WithParamsValue(dubboConstant.VersionKey, versionKey),
		common.WithPath(interfaceKey),
	)
	if err != nil {
		ctx.SetError(err)
		return filter.Stop
	}

	dubboProtocol := dubbo.NewDubboProtocol()

	// TODO: will print many Error when failed to connect server
	invoker := dubboProtocol.Refer(url)
	if invoker == nil {
		ctx.SetError(errors.Errorf("can't connect to upstream server %s with address %s", endpoint.Name, endpoint.Address.GetAddress()))
		return filter.Stop
	}
	var resp any
	invoc.SetReply(&resp)

	invCtx := context.Background()
	result := invoker.Invoke(invCtx, invoc)
	result.SetAttachments(invoc.Attachments())
	if result.Error() != nil {
		ctx.SetError(result.Error())
		return filter.Stop
	}

	value := reflect.ValueOf(result.Result())
	result.SetResult(value.Elem().Interface())
	ctx.SetResult(result)
	return filter.Continue
}