in pkg/adapters/micro/client.go [61:102]
func (c *clientWrapper) Stream(ctx context.Context, req client.Request, opts ...client.CallOption) (client.Stream, error) {
options := evaluateOptions(c.Opts)
if options.enableOutlier == nil || !options.enableOutlier(ctx) {
resourceName := req.Method()
if options.streamClientResourceExtract != nil {
resourceName = options.streamClientResourceExtract(ctx, req)
}
entry, blockErr := sentinel.Entry(
resourceName,
sentinel.WithResourceType(base.ResTypeRPC),
sentinel.WithTrafficType(base.Outbound),
)
if blockErr != nil {
if options.streamClientBlockFallback != nil {
return options.streamClientBlockFallback(ctx, req, blockErr)
}
return nil, blockErr
}
defer entry.Exit()
stream, err := c.Client.Stream(ctx, req, opts...)
if err != nil {
sentinel.TraceError(entry, err)
}
return stream, err
} else {
slotChain := sentinel.GlobalSlotChain()
slotChain.AddRuleCheckSlot(outlier.DefaultSlot)
slotChain.AddStatSlot(outlier.DefaultMetricStatSlot)
entry, _ := sentinel.Entry(
req.Service(),
sentinel.WithResourceType(base.ResTypeRPC),
sentinel.WithTrafficType(base.Outbound),
sentinel.WithSlotChain(slotChain),
)
defer entry.Exit()
opts = append(opts, WithSelectOption(entry))
opts = append(opts, WithCallWrapper(entry))
return c.Client.Stream(ctx, req, opts...)
}
}