in v2/lockrenewer.go [127:179]
func (plr *peekLockRenewer) startPeriodicRenewal(ctx context.Context, message *azservicebus.ReceivedMessage) {
logger := getLogger(ctx)
count := 0
span := trace.SpanFromContext(ctx)
for plr.alive.Store(true); plr.alive.Load(); {
select {
case <-time.After(*plr.renewalInterval):
if !plr.alive.Load() {
return
}
count++
err := plr.renewMessageLock(ctx, message, nil)
if err != nil {
// The context is canceled when the message handler returns from the processor.
// This can happen if we already entered the interval case when the message processing completes.
// The best we can do is log and retry on the next tick. The sdk already retries operations on recoverable network errors.
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
// if the error is a context error
// we stop and let the next loop iteration handle the exit.
plr.stop(ctx)
continue
}
plr.metrics.IncMessageLockRenewedFailure(message)
span.RecordError(fmt.Errorf("failed to renew lock: %w", err))
// on error, we continue to the next loop iteration.
// if the context is Done, we will enter the ctx.Done() case and exit the renewal.
// if the error is identified as permanent, we stop the renewal.
// if the error is anything else, we keep trying the renewal.
if plr.isPermanent(err) {
logger.Error(fmt.Sprintf("stopping periodic renewal for message: %s", message.MessageID))
plr.stop(ctx)
}
continue
}
span.AddEvent("message lock renewed", trace.WithAttributes(attribute.Int("count", count)))
plr.metrics.IncMessageLockRenewedSuccess(message)
case <-ctx.Done():
logger.Info("context done: stopping periodic renewal")
span.AddEvent("context done: stopping message lock renewal")
err := ctx.Err()
if errors.Is(err, context.DeadlineExceeded) {
span.RecordError(err)
plr.metrics.IncMessageDeadlineReachedCount(message)
}
plr.stop(ctx)
case <-plr.stopped:
if plr.alive.Load() {
logger.Info("stop signal received: exiting periodic renewal")
plr.alive.Store(false)
}
}
}
}