in pkg/filter/network/dubboproxy/filter/proxy/proxyfilter.go [149:202]
func (f Filter) sendTripleRequest(ctx *dubbo2.RpcContext) filter.FilterStatus {
ra := ctx.Route
clusterName := ra.Cluster
clusterManager := server.GetClusterManager()
endpoint := clusterManager.PickEndpoint(clusterName, ctx)
if endpoint == nil {
ctx.SetError(errors.Errorf("Requested dubbo rpc invocation endpoint not found"))
return filter.Stop
}
invoc := ctx.RpcInvocation
path := invoc.GetAttachmentInterface(dubboConstant.PathKey).(string)
// create URL from RpcInvocation
url, err := common.NewURL(endpoint.Address.GetAddress(),
common.WithProtocol(tripleConstant.TRIPLE), common.WithParamsValue(dubboConstant.SerializationKey, dubboConstant.Hessian2Serialization),
common.WithParamsValue(dubboConstant.GenericFilterKey, "true"),
common.WithParamsValue(dubboConstant.AppVersionKey, "3.0.0"),
common.WithParamsValue(dubboConstant.InterfaceKey, path),
common.WithParamsValue(dubboConstant.ReferenceFilterKey, "generic,filter"),
common.WithPath(path),
)
if err != nil {
ctx.SetError(err)
return filter.Stop
}
invoker, err := dubbo3.NewDubboInvoker(url)
if err != nil {
ctx.SetError(err)
return filter.Stop
}
var resp any
invoc.SetReply(&resp)
invCtx := context.Background()
result := invoker.Invoke(invCtx, invoc)
if result.Error() != nil {
ctx.SetError(result.Error())
return filter.Stop
}
// when upstream server down, the result and error in result are both nil
if result.Result() == nil {
ctx.SetError(errors.New("result from upstream server is nil"))
return filter.Stop
}
result.SetAttachments(invoc.Attachments())
value := reflect.ValueOf(result.Result())
result.SetResult(value.Elem().Interface())
ctx.SetResult(result)
return filter.Continue
}