v2/health.go (123 lines of code) (raw):
package shuttle
import (
"context"
"fmt"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
"github.com/Azure/go-shuttle/v2/metrics/processor"
"github.com/Azure/go-shuttle/v2/metrics/sender"
)
const defaultHealthCheckInterval = 1 * time.Minute
// HealthChecker performs periodic health checks on the Service Bus Senders and Receivers.
type HealthChecker struct {
// clients is a map of namespaceName name to azservicebus.Client.
clients map[string]*azservicebus.Client
options *HealthCheckerOptions
}
// HealthCheckerOptions configures the HealthChecker.
// HealthCheckInterval defaults to 1 minute if not set or set to <= 0.
// HealthCheckTimeout defaults to HealthChecker.interval if not set or set to 0 or set to be larger than interval.
type HealthCheckerOptions struct {
// HealthCheckInterval is the time between health checks.
HealthCheckInterval time.Duration
// HealthCheckTimeout is the context timeout for each health check
HealthCheckTimeout time.Duration
}
// NewHealthChecker creates a new HealthChecker with the provided clients.
// clients is a map of namespaceName name to azservicebus.Client.
func NewHealthChecker(clients map[string]*azservicebus.Client, options *HealthCheckerOptions) *HealthChecker {
if options == nil {
options = &HealthCheckerOptions{}
}
if options.HealthCheckInterval <= 0 {
options.HealthCheckInterval = defaultHealthCheckInterval
}
if options.HealthCheckTimeout <= 0 || options.HealthCheckTimeout > options.HealthCheckInterval {
options.HealthCheckTimeout = options.HealthCheckInterval
}
return &HealthChecker{
clients: clients,
options: options,
}
}
// StartPeriodicHealthCheck starts the periodic health check for the provided HealthCheckable.
// The health check will run on each client in the HealthChecker.
// Stops when the context is canceled.
func (h *HealthChecker) StartPeriodicHealthCheck(ctx context.Context, hc HealthCheckable) {
for namespaceName, client := range h.clients {
go func(namespaceName string, client *azservicebus.Client) {
h.periodicHealthCheck(ctx, hc, namespaceName, client)
}(namespaceName, client)
}
}
func (h *HealthChecker) periodicHealthCheck(ctx context.Context, hc HealthCheckable, namespaceName string, client *azservicebus.Client) {
nextCheck := time.Now()
for {
select {
case <-ctx.Done():
return
case <-time.After(time.Until(nextCheck)):
func() {
sbCtx, cancel := context.WithTimeout(ctx, h.options.HealthCheckTimeout)
defer cancel()
err := hc.HealthCheck(sbCtx, namespaceName, client)
if err != nil {
getLogger(ctx).Error(fmt.Sprintf("Health check failed for namespace %s: %s", namespaceName, err.Error()))
}
}()
nextCheck = nextCheck.Add(h.options.HealthCheckInterval)
}
}
}
// HealthCheckable is an interface for performing health checks on azservicebus.Sender and azservicebus.Receiver.
type HealthCheckable interface {
HealthCheck(ctx context.Context, namespaceName string, client *azservicebus.Client) error
}
// SenderHealthChecker performs health checks on azservicebus.Sender.
type SenderHealthChecker struct {
EntityName string
}
// HealthCheck performs a health check on azservicebus.Sender by creating a new message batch.
func (s *SenderHealthChecker) HealthCheck(ctx context.Context, namespaceName string, client *azservicebus.Client) error {
sbSender, err := client.NewSender(s.EntityName, nil)
defer func() {
if sbSender != nil {
if closeErr := sbSender.Close(ctx); closeErr != nil {
getLogger(ctx).Error(closeErr.Error())
err = closeErr
}
}
s.incHealthCheckMetric(namespaceName, err)
}()
if err != nil {
return err
}
_, err = sbSender.NewMessageBatch(ctx, nil)
return err
}
// IncHealthCheckMetric increments the sender health check metric based on the health check result.
func (s *SenderHealthChecker) incHealthCheckMetric(namespaceName string, healthCheckErr error) {
if healthCheckErr != nil {
sender.Metric.IncHealthCheckFailureCount(namespaceName, s.EntityName)
} else {
sender.Metric.IncHealthCheckSuccessCount(namespaceName, s.EntityName)
}
}
// ReceiverHealthChecker performs health checks on azservicebus.Receiver.
type ReceiverHealthChecker struct {
EntityName string
SubscriptionName string
}
// HealthCheck performs a health check on the azservicebus.Receiver by peeking a message.
func (r *ReceiverHealthChecker) HealthCheck(ctx context.Context, namespaceName string, client *azservicebus.Client) error {
sbReceiver, err := r.createReceiver(client)
defer func() {
if sbReceiver != nil {
if closeErr := sbReceiver.Close(ctx); closeErr != nil {
getLogger(ctx).Error(closeErr.Error())
err = closeErr
}
}
r.incHealthCheckMetric(namespaceName, err)
}()
if err != nil {
return err
}
// note: PeekMessages() does not return an error when the entity is empty
_, err = sbReceiver.PeekMessages(ctx, 1, nil)
return err
}
// IncHealthCheckMetric increments the receiver health check metric based on the health check result.
func (r *ReceiverHealthChecker) incHealthCheckMetric(namespaceName string, healthCheckErr error) {
if healthCheckErr != nil {
processor.Metric.IncHealthCheckFailureCount(namespaceName, r.EntityName, r.SubscriptionName)
} else {
processor.Metric.IncHealthCheckSuccessCount(namespaceName, r.EntityName, r.SubscriptionName)
}
}
func (r *ReceiverHealthChecker) createReceiver(client *azservicebus.Client) (*azservicebus.Receiver, error) {
if r.SubscriptionName == "" {
return client.NewReceiverForQueue(r.EntityName, nil)
}
return client.NewReceiverForSubscription(r.EntityName, r.SubscriptionName, nil)
}