func()

in pkg/filter/network/dubboproxy/filter/proxy/proxyfilter.go [149:202]


func (f Filter) sendTripleRequest(ctx *dubbo2.RpcContext) filter.FilterStatus {
	ra := ctx.Route
	clusterName := ra.Cluster
	clusterManager := server.GetClusterManager()
	endpoint := clusterManager.PickEndpoint(clusterName, ctx)
	if endpoint == nil {
		ctx.SetError(errors.Errorf("Requested dubbo rpc invocation endpoint not found"))
		return filter.Stop
	}

	invoc := ctx.RpcInvocation
	path := invoc.GetAttachmentInterface(dubboConstant.PathKey).(string)
	// create URL from RpcInvocation
	url, err := common.NewURL(endpoint.Address.GetAddress(),
		common.WithProtocol(tripleConstant.TRIPLE), common.WithParamsValue(dubboConstant.SerializationKey, dubboConstant.Hessian2Serialization),
		common.WithParamsValue(dubboConstant.GenericFilterKey, "true"),
		common.WithParamsValue(dubboConstant.AppVersionKey, "3.0.0"),
		common.WithParamsValue(dubboConstant.InterfaceKey, path),
		common.WithParamsValue(dubboConstant.ReferenceFilterKey, "generic,filter"),
		common.WithPath(path),
	)
	if err != nil {
		ctx.SetError(err)
		return filter.Stop
	}

	invoker, err := dubbo3.NewDubboInvoker(url)
	if err != nil {
		ctx.SetError(err)
		return filter.Stop
	}

	var resp any
	invoc.SetReply(&resp)
	invCtx := context.Background()
	result := invoker.Invoke(invCtx, invoc)

	if result.Error() != nil {
		ctx.SetError(result.Error())
		return filter.Stop
	}

	// when upstream server down, the result and error in result are both nil
	if result.Result() == nil {
		ctx.SetError(errors.New("result from upstream server is nil"))
		return filter.Stop
	}

	result.SetAttachments(invoc.Attachments())
	value := reflect.ValueOf(result.Result())
	result.SetResult(value.Elem().Interface())
	ctx.SetResult(result)
	return filter.Continue
}