pkg/adapters/kitex/client.go (98 lines of code) (raw):

package kitex import ( "context" "fmt" "github.com/cloudwego/kitex/pkg/discovery" "github.com/cloudwego/kitex/pkg/endpoint" ruleBasedResolver "github.com/kitex-contrib/resolver-rule-based" sentinel "github.com/alibaba/sentinel-golang/api" "github.com/alibaba/sentinel-golang/core/base" "github.com/alibaba/sentinel-golang/core/outlier" ) var filterNodes []string var halfNodes []string // SentinelClientMiddleware returns new client.Middleware // Default resource name is {service's name}:{method} // Default block fallback is returning blockError // Define your own behavior by setting serverOptions func SentinelClientMiddleware(opts ...Option) func(endpoint.Endpoint) endpoint.Endpoint { options := newOptions(opts) return func(next endpoint.Endpoint) endpoint.Endpoint { return func(ctx context.Context, req, resp interface{}) error { if !options.EnableOutlier(ctx) { resourceName := options.ResourceExtract(ctx, req, resp) entry, blockErr := sentinel.Entry( resourceName, sentinel.WithResourceType(base.ResTypeRPC), sentinel.WithTrafficType(base.Outbound), ) if blockErr != nil { return options.BlockFallback(ctx, req, resp, blockErr) } defer entry.Exit() err := next(ctx, req, resp) if err != nil { sentinel.TraceError(entry, err) } return err } else { // returns new client middleware specifically for outlier ejection. slotChain := sentinel.BuildDefaultSlotChain() slotChain.AddRuleCheckSlot(outlier.DefaultSlot) slotChain.AddStatSlot(outlier.DefaultMetricStatSlot) resourceName := ServiceNameExtract(ctx) entry, _ := sentinel.Entry( resourceName, sentinel.WithResourceType(base.ResTypeRPC), sentinel.WithTrafficType(base.Outbound), sentinel.WithSlotChain(slotChain), ) defer entry.Exit() filterNodes = entry.Context().FilterNodes() halfNodes = entry.Context().HalfOpenNodes() err := next(ctx, req, resp) if callee := CalleeAddressExtract(ctx); callee != "" { sentinel.TraceCallee(entry, callee) if err != nil { sentinel.TraceError(entry, err) } } return err } } } } func OutlierClientResolver(resolver discovery.Resolver) discovery.Resolver { filterFunc := func(ctx context.Context, nodes []discovery.Instance) []discovery.Instance { var nodesPost []discovery.Instance if len(halfNodes) != 0 { fmt.Println("Half Filter Pre: ", printNodes(nodes)) nodesPost = getRemainingNodes(nodes, halfNodes, true) fmt.Println("Half Filter Post: ", printNodes(nodesPost)) } else { fmt.Println("Filter Pre: ", printNodes(nodes)) nodesPost = getRemainingNodes(nodes, filterNodes, false) fmt.Println("Filter Post: ", printNodes(nodesPost)) } return nodesPost } // Construct the filterRule and build rule based resolver filterRule := &ruleBasedResolver.FilterRule{ Name: "outlier_filter_rule", Funcs: []ruleBasedResolver.FilterFunc{filterFunc}, } return ruleBasedResolver.NewRuleBasedResolver(resolver, filterRule) } func getRemainingNodes(nodes []discovery.Instance, filters []string, flag bool) []discovery.Instance { nodesMap := make(map[string]struct{}) for _, node := range filters { nodesMap[node] = struct{}{} } nodesPost := make([]discovery.Instance, 0) for _, ep := range nodes { if _, ok := nodesMap[ep.Address().String()]; ok == flag { nodesPost = append(nodesPost, ep) } } return nodesPost } // TODO remove this func func printNodes(nodes []discovery.Instance) (res []string) { for _, v := range nodes { res = append(res, v.Address().String()) } return }