v2/processor.go (231 lines of code) (raw):

package shuttle import ( "context" "errors" "fmt" "sync" "time" "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" "github.com/Azure/go-shuttle/v2/metrics/processor" ) type Receiver interface { ReceiveMessages(ctx context.Context, maxMessages int, options *azservicebus.ReceiveMessagesOptions) ([]*azservicebus.ReceivedMessage, error) MessageSettler } // MessageSettler is passed to the handlers. it exposes the message settling functionality from the receiver needed within the handler. type MessageSettler interface { AbandonMessage(ctx context.Context, message *azservicebus.ReceivedMessage, options *azservicebus.AbandonMessageOptions) error CompleteMessage(ctx context.Context, message *azservicebus.ReceivedMessage, options *azservicebus.CompleteMessageOptions) error DeadLetterMessage(ctx context.Context, message *azservicebus.ReceivedMessage, options *azservicebus.DeadLetterOptions) error DeferMessage(ctx context.Context, message *azservicebus.ReceivedMessage, options *azservicebus.DeferMessageOptions) error RenewMessageLock(ctx context.Context, message *azservicebus.ReceivedMessage, options *azservicebus.RenewMessageLockOptions) error } type ReceiverEx struct { // shuttle.Receiver is already an exported interface name string sbReceiver Receiver } func NewReceiverEx(name string, sbReceiver Receiver) *ReceiverEx { return &ReceiverEx{ name: name, sbReceiver: sbReceiver, } } type Handler interface { Handle(context.Context, MessageSettler, *azservicebus.ReceivedMessage) } // HandlerFunc is a func to handle the message received from a subscription type HandlerFunc func(ctx context.Context, settler MessageSettler, message *azservicebus.ReceivedMessage) func (f HandlerFunc) Handle(ctx context.Context, settler MessageSettler, message *azservicebus.ReceivedMessage) { f(ctx, settler, message) } // Processor encapsulates the message pump and concurrency handling of servicebus. // it exposes a handler API to provides a middleware based message processing pipeline. type Processor struct { receivers map[string]*ReceiverEx options ProcessorOptions handle Handler concurrencyTokens chan struct{} // tracks how many concurrent messages are currently being handled by the processor, shared across all receivers } // ProcessorOptions configures the processor // MaxConcurrency defaults to 1. Not setting MaxConcurrency, or setting it to 0 or a negative value will fallback to the default. // MaxReceiveCount defaults to MaxConcurrency if not set. Not setting MaxReceiveCount, or setting it to 0 or a negative value will fallback to the default. // Setting it to a value greater than MaxConcurrency will also fallback to the default. // ReceiveInterval defaults to 2 seconds if not set. // StartMaxAttempt defaults to 1 if not set (no retries). Not setting StartMaxAttempt, or setting it to non-positive value will fallback to the default. // StartRetryDelayStrategy defaults to a fixed 5-second delay if not set. type ProcessorOptions struct { // MaxConcurrency is the maximum number of concurrent messages to process at a time. MaxConcurrency int // MaxReceiveCount is the maximum number of messages to receive at a time. This value is passed to the receiver. MaxReceiveCount int // ReceiveInterval is the interval between each receive call. ReceiveInterval *time.Duration // StartMaxAttempt is the maximum number of attempts to start the processor. StartMaxAttempt int // StartRetryDelay is the delay between each start attempt. StartRetryDelayStrategy RetryDelayStrategy } func applyProcessorOptions(options *ProcessorOptions) *ProcessorOptions { opts := &ProcessorOptions{ MaxConcurrency: 1, MaxReceiveCount: 1, ReceiveInterval: to.Ptr(1 * time.Second), StartMaxAttempt: 1, StartRetryDelayStrategy: &ConstantDelayStrategy{Delay: 5 * time.Second}, } if options != nil { if options.ReceiveInterval != nil { opts.ReceiveInterval = options.ReceiveInterval } if options.MaxConcurrency > 0 { opts.MaxConcurrency = options.MaxConcurrency opts.MaxReceiveCount = options.MaxConcurrency } if options.MaxReceiveCount > 0 && options.MaxReceiveCount < opts.MaxConcurrency { opts.MaxReceiveCount = options.MaxReceiveCount } if options.StartMaxAttempt > 0 { opts.StartMaxAttempt = options.StartMaxAttempt } if options.StartRetryDelayStrategy != nil { opts.StartRetryDelayStrategy = options.StartRetryDelayStrategy } } return opts } // NewProcessor creates a new processor with the provided receiver and handler. func NewProcessor(receiver Receiver, handler HandlerFunc, options *ProcessorOptions) *Processor { opts := applyProcessorOptions(options) receiverEx := NewReceiverEx("receiver", receiver) return &Processor{ receivers: map[string]*ReceiverEx{receiverEx.name: receiverEx}, handle: handler, options: *opts, concurrencyTokens: make(chan struct{}, opts.MaxConcurrency), } } // NewMultiProcessor creates a new processor with a list of receivers and a handler. func NewMultiProcessor(receiversEx []*ReceiverEx, handler HandlerFunc, options *ProcessorOptions) *Processor { opts := applyProcessorOptions(options) var receivers = make(map[string]*ReceiverEx) for _, receiver := range receiversEx { receivers[receiver.name] = receiver } return &Processor{ receivers: receivers, handle: handler, options: *opts, concurrencyTokens: make(chan struct{}, opts.MaxConcurrency), } } // Start starts processing on all the receivers of the processor and blocks until all processors are stopped or the context is canceled. // It will retry starting the processor based on the StartMaxAttempt and StartRetryDelayStrategy. // Returns a combined list of errors encountered during each processor start. func (p *Processor) Start(ctx context.Context) error { wg := sync.WaitGroup{} errChan := make(chan error, len(p.receivers)) for name := range p.receivers { wg.Add(1) go func(receiverName string) { defer func() { if rec := recover(); rec != nil { errChan <- fmt.Errorf("panic recovered from processor %s: %v", receiverName, rec) } wg.Done() }() err := p.startOne(ctx, receiverName) if err != nil { errChan <- err } }(name) } wg.Wait() close(errChan) var allErrs []error for err := range errChan { allErrs = append(allErrs, err) } return errors.Join(allErrs...) } // startOne starts a processor with the receiverName and blocks until an error occurs or the context is canceled. // It will retry starting the processor based on the StartMaxAttempt and StartRetryDelayStrategy. // Returns a combined list of errors during the start attempts or ctx.Err() if the context // is cancelled during the retries. func (p *Processor) startOne(ctx context.Context, receiverName string) error { logger := getLogger(ctx) receiverEx, ok := p.receivers[receiverName] if !ok { return fmt.Errorf("processor %s not found", receiverName) } var savedError error for attempt := 0; attempt < p.options.StartMaxAttempt; attempt++ { if err := p.start(ctx, receiverEx); err != nil { savedError = errors.Join(savedError, err) logger.Error(fmt.Sprintf("processor %s start attempt %d failed: %v", receiverName, attempt, err)) if attempt+1 == p.options.StartMaxAttempt { // last attempt, return early break } select { case <-time.After(p.options.StartRetryDelayStrategy.GetDelay(uint32(attempt))): continue case <-ctx.Done(): logger.Info("context done, stop retrying") return ctx.Err() } } } return savedError } // start starts the processor and blocks until an error occurs or the context is canceled. func (p *Processor) start(ctx context.Context, receiverEx *ReceiverEx) error { logger := getLogger(ctx) receiverName := receiverEx.name receiver := receiverEx.sbReceiver logger.Info(fmt.Sprintf("starting processor %s", receiverName)) messages, err := receiver.ReceiveMessages(ctx, p.options.MaxReceiveCount, nil) if err != nil { return fmt.Errorf("processor %s failed to receive messages: %w", receiverName, err) } logger.Info(fmt.Sprintf("processor %s received %d messages - initial", receiverName, len(messages))) processor.Metric.IncMessageReceived(receiverName, float64(len(messages))) for _, msg := range messages { p.process(ctx, receiverEx, msg) } for ctx.Err() == nil { select { case <-time.After(*p.options.ReceiveInterval): maxMessages := min(p.options.MaxReceiveCount, p.options.MaxConcurrency-len(p.concurrencyTokens)) if ctx.Err() != nil || maxMessages == 0 { break } messages, err := receiver.ReceiveMessages(ctx, maxMessages, nil) if err != nil { return fmt.Errorf("processor %s failed to receive messages: %w", receiverName, err) } logger.Info(fmt.Sprintf("processor %s received %d messages from processor loop", receiverName, len(messages))) processor.Metric.IncMessageReceived(receiverName, float64(len(messages))) for _, msg := range messages { p.process(ctx, receiverEx, msg) } case <-ctx.Done(): logger.Info(fmt.Sprintf("context done, stop receiving from processor %s", receiverName)) break } } logger.Info(fmt.Sprintf("exiting processor %s", receiverName)) return fmt.Errorf("processor %s stopped: %w", receiverName, ctx.Err()) } func (p *Processor) process(ctx context.Context, receiverEx *ReceiverEx, message *azservicebus.ReceivedMessage) { receiverName := receiverEx.name p.concurrencyTokens <- struct{}{} go func() { msgContext, cancel := context.WithCancel(ctx) // cancel messageContext when we get out of this goroutine defer cancel() defer func() { <-p.concurrencyTokens processor.Metric.IncMessageHandled(receiverName, message) processor.Metric.DecConcurrentMessageCount(receiverName, message) }() processor.Metric.IncConcurrentMessageCount(receiverName, message) p.handle.Handle(msgContext, receiverEx.sbReceiver, message) }() } type PanicHandlerOptions struct { OnPanicRecovered func( ctx context.Context, settler MessageSettler, message *azservicebus.ReceivedMessage, recovered any) } // NewPanicHandler recovers panics from downstream handlers func NewPanicHandler(panicOptions *PanicHandlerOptions, handler Handler) HandlerFunc { if panicOptions == nil { panicOptions = &PanicHandlerOptions{ OnPanicRecovered: func(ctx context.Context, settler MessageSettler, message *azservicebus.ReceivedMessage, recovered any) { getLogger(ctx).Error(fmt.Sprintf("panic recovered: %v", recovered)) }, } } return func(ctx context.Context, settler MessageSettler, message *azservicebus.ReceivedMessage) { defer func() { if rec := recover(); rec != nil { panicOptions.OnPanicRecovered(ctx, settler, message, rec) } }() handler.Handle(ctx, settler, message) } }