in pkg/filter/network/dubboproxy/filter/proxy/proxyfilter.go [93:147]
func (f Filter) sendDubboRequest(ctx *dubbo2.RpcContext) filter.FilterStatus {
ra := ctx.Route
clusterName := ra.Cluster
clusterManager := server.GetClusterManager()
endpoint := clusterManager.PickEndpoint(clusterName, ctx)
if endpoint == nil {
ctx.SetError(errors.Errorf("Requested dubbo rpc invocation endpoint not found"))
return filter.Stop
}
invoc := ctx.RpcInvocation
interfaceKey, _ := invoc.GetAttachment(dubboConstant.InterfaceKey)
groupKey, _ := invoc.GetAttachment(dubboConstant.GroupKey)
versionKey, _ := invoc.GetAttachment(dubboConstant.VersionKey)
url, err := common.NewURL(endpoint.Address.GetAddress(),
common.WithProtocol(dubbo.DUBBO), common.WithParamsValue(dubboConstant.SerializationKey, dubboConstant.Hessian2Serialization),
common.WithParamsValue(dubboConstant.GenericFilterKey, "true"),
common.WithParamsValue(dubboConstant.InterfaceKey, interfaceKey),
common.WithParamsValue(dubboConstant.ReferenceFilterKey, "generic,filter"),
// dubboAttachment must contains group and version info
common.WithParamsValue(dubboConstant.GroupKey, groupKey),
common.WithParamsValue(dubboConstant.VersionKey, versionKey),
common.WithPath(interfaceKey),
)
if err != nil {
ctx.SetError(err)
return filter.Stop
}
dubboProtocol := dubbo.NewDubboProtocol()
// TODO: will print many Error when failed to connect server
invoker := dubboProtocol.Refer(url)
if invoker == nil {
ctx.SetError(errors.Errorf("can't connect to upstream server %s with address %s", endpoint.Name, endpoint.Address.GetAddress()))
return filter.Stop
}
var resp any
invoc.SetReply(&resp)
invCtx := context.Background()
result := invoker.Invoke(invCtx, invoc)
result.SetAttachments(invoc.Attachments())
if result.Error() != nil {
ctx.SetError(result.Error())
return filter.Stop
}
value := reflect.ValueOf(result.Result())
result.SetResult(value.Elem().Interface())
ctx.SetResult(result)
return filter.Continue
}