pkg/adapters/kratos/client.go (104 lines of code) (raw):

package kratos import ( "context" "fmt" "github.com/go-kratos/kratos/v2/metadata" "github.com/go-kratos/kratos/v2/middleware" "github.com/go-kratos/kratos/v2/selector" sentinel "github.com/alibaba/sentinel-golang/api" "github.com/alibaba/sentinel-golang/core/base" "github.com/alibaba/sentinel-golang/core/outlier" ) const filterNodesKey = "filterNodes" const halfNodesKey = "halfNodes" func OutlierClientFilter(ctx context.Context, nodes []selector.Node) []selector.Node { var filterNodes, halfNodes []string if v, ok := metadata.FromClientContext(ctx); ok { filterNodes = v.Values(filterNodesKey) halfNodes = v.Values(halfNodesKey) } var nodesPost []selector.Node if len(halfNodes) != 0 { fmt.Println("Half Filter Pre: ", printNodes(nodes)) nodesPost = getRemainingNodes(nodes, halfNodes, true) fmt.Println("Half Filter Post: ", printNodes(nodesPost)) } else { fmt.Println("Filter Pre: ", printNodes(nodes)) nodesPost = getRemainingNodes(nodes, filterNodes, false) fmt.Println("Filter Post: ", printNodes(nodesPost)) } return nodesPost } // fix me: Only the outlier ejection of the kratos adapter has been verified, // and the flow control capabilities still need to be validated. func SentinelClientMiddleware(opts ...Option) middleware.Middleware { options := newOptions(opts) return func(src middleware.Handler) middleware.Handler { return func(ctx context.Context, req interface{}) (interface{}, error) { if !options.EnableOutlier(ctx) { resourceName := options.ResourceExtract(ctx, req) entry, blockErr := sentinel.Entry( resourceName, sentinel.WithResourceType(base.ResTypeRPC), sentinel.WithTrafficType(base.Outbound), ) if blockErr != nil { return options.BlockFallback(ctx, req, blockErr) } defer entry.Exit() resp, err := src(ctx, req) if err != nil { sentinel.TraceError(entry, err) } return resp, err } else { // returns new client middleware specifically for outlier ejection. resourceName := ServiceNameExtract(ctx) slotChain := sentinel.BuildDefaultSlotChain() slotChain.AddRuleCheckSlot(outlier.DefaultSlot) slotChain.AddStatSlot(outlier.DefaultMetricStatSlot) entry, _ := sentinel.Entry( resourceName, sentinel.WithResourceType(base.ResTypeRPC), sentinel.WithTrafficType(base.Outbound), sentinel.WithSlotChain(slotChain), ) defer entry.Exit() if v, ok := metadata.FromClientContext(ctx); ok { filterNodes := entry.Context().FilterNodes() for _, node := range filterNodes { v.Add(filterNodesKey, node) } halfNodes := entry.Context().HalfOpenNodes() for _, node := range halfNodes { v.Add(halfNodesKey, node) } } res, err := src(ctx, req) if p, ok := selector.FromPeerContext(ctx); ok && p.Node != nil { sentinel.TraceCallee(entry, p.Node.Address()) if err != nil { sentinel.TraceError(entry, err) } } return res, err } } } } func getRemainingNodes(nodes []selector.Node, filters []string, flag bool) []selector.Node { nodesMap := make(map[string]struct{}) for _, node := range filters { nodesMap[node] = struct{}{} } nodesPost := make([]selector.Node, 0) for _, ep := range nodes { if _, ok := nodesMap[ep.Address()]; ok == flag { nodesPost = append(nodesPost, ep) } } return nodesPost } func printNodes(nodes []selector.Node) (res []string) { for _, v := range nodes { res = append(res, v.Address()) } return }