pkg/adapters/micro/client.go (97 lines of code) (raw):
package micro
import (
"context"
"github.com/micro/go-micro/v2/client"
sentinel "github.com/alibaba/sentinel-golang/api"
"github.com/alibaba/sentinel-golang/core/base"
"github.com/alibaba/sentinel-golang/core/outlier"
)
type clientWrapper struct {
client.Client
Opts []Option
}
func (c *clientWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error {
options := evaluateOptions(c.Opts)
if options.enableOutlier == nil || !options.enableOutlier(ctx) {
resourceName := req.Method()
if options.clientResourceExtract != nil {
resourceName = options.clientResourceExtract(ctx, req)
}
entry, blockErr := sentinel.Entry(
resourceName,
sentinel.WithResourceType(base.ResTypeRPC),
sentinel.WithTrafficType(base.Outbound),
)
if blockErr != nil {
if options.clientBlockFallback != nil {
return options.clientBlockFallback(ctx, req, blockErr)
}
return blockErr
}
defer entry.Exit()
err := c.Client.Call(ctx, req, rsp, opts...)
if err != nil {
sentinel.TraceError(entry, err)
}
return err
} else { // returns new client middleware specifically for outlier ejection.
slotChain := sentinel.BuildDefaultSlotChain()
slotChain.AddRuleCheckSlot(outlier.DefaultSlot)
slotChain.AddStatSlot(outlier.DefaultMetricStatSlot)
entry, _ := sentinel.Entry(
req.Service(),
sentinel.WithResourceType(base.ResTypeRPC),
sentinel.WithTrafficType(base.Outbound),
sentinel.WithSlotChain(slotChain),
)
defer entry.Exit()
opts = append(opts, WithSelectOption(entry))
opts = append(opts, WithCallWrapper(entry))
return c.Client.Call(ctx, req, rsp, opts...)
}
}
func (c *clientWrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
options := evaluateOptions(c.Opts)
if options.enableOutlier == nil || !options.enableOutlier(ctx) {
resourceName := req.Method()
if options.streamClientResourceExtract != nil {
resourceName = options.streamClientResourceExtract(ctx, req)
}
entry, blockErr := sentinel.Entry(
resourceName,
sentinel.WithResourceType(base.ResTypeRPC),
sentinel.WithTrafficType(base.Outbound),
)
if blockErr != nil {
if options.streamClientBlockFallback != nil {
return options.streamClientBlockFallback(ctx, req, blockErr)
}
return nil, blockErr
}
defer entry.Exit()
stream, err := c.Client.Stream(ctx, req, opts...)
if err != nil {
sentinel.TraceError(entry, err)
}
return stream, err
} else {
slotChain := sentinel.GlobalSlotChain()
slotChain.AddRuleCheckSlot(outlier.DefaultSlot)
slotChain.AddStatSlot(outlier.DefaultMetricStatSlot)
entry, _ := sentinel.Entry(
req.Service(),
sentinel.WithResourceType(base.ResTypeRPC),
sentinel.WithTrafficType(base.Outbound),
sentinel.WithSlotChain(slotChain),
)
defer entry.Exit()
opts = append(opts, WithSelectOption(entry))
opts = append(opts, WithCallWrapper(entry))
return c.Client.Stream(ctx, req, opts...)
}
}
// NewClientWrapper returns a sentinel client Wrapper.
func NewClientWrapper(opts ...Option) client.Wrapper {
return func(c client.Client) client.Client {
return &clientWrapper{c, opts}
}
}