internal/rabbitmq/client.go (162 lines of code) (raw):
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package rabbitmq // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/rabbitmq"
import (
"context"
"crypto/tls"
"errors"
"sync"
"time"
amqp "github.com/rabbitmq/amqp091-go"
"go.uber.org/zap"
)
type AmqpClient interface {
DialConfig(config DialConfig) (Connection, error)
}
type Connection interface {
ReconnectIfUnhealthy() error
IsClosed() bool
Channel() (Channel, error)
NotifyClose(receiver chan *amqp.Error) chan *amqp.Error
Close() error
}
type Channel interface {
Confirm(noWait bool) error
PublishWithDeferredConfirmWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) (DeferredConfirmation, error)
IsClosed() bool
Close() error
}
type DeferredConfirmation interface {
Done() <-chan struct{}
Acked() bool
}
type connectionHolder struct {
url string
config amqp.Config
connection *amqp.Connection
logger *zap.Logger
connLock *sync.Mutex
connectionErrors chan *amqp.Error
}
type channelHolder struct {
channel *amqp.Channel
}
type deferredConfirmationHolder struct {
confirmation *amqp.DeferredConfirmation
}
type DialConfig struct {
URL string
Vhost string
Auth amqp.Authentication
ConnectionTimeout time.Duration
Heartbeat time.Duration
TLS *tls.Config
ConnectionName string
}
func NewAmqpClient(logger *zap.Logger) AmqpClient {
return &client{logger: logger}
}
type client struct {
logger *zap.Logger
}
func (c *client) DialConfig(config DialConfig) (Connection, error) {
properties := amqp.Table{}
properties.SetClientConnectionName(config.ConnectionName)
ch := &connectionHolder{
url: config.URL,
config: amqp.Config{
SASL: []amqp.Authentication{config.Auth},
Vhost: config.Vhost,
TLSClientConfig: config.TLS,
Heartbeat: config.Heartbeat,
Dial: amqp.DefaultDial(config.ConnectionTimeout),
Properties: properties,
},
logger: c.logger,
connLock: &sync.Mutex{},
connectionErrors: make(chan *amqp.Error, 1),
}
ch.connLock.Lock()
defer ch.connLock.Unlock()
return ch, ch.connect()
}
func (c *connectionHolder) ReconnectIfUnhealthy() error {
c.connLock.Lock()
defer c.connLock.Unlock()
hasConnectionError := false
select {
case err := <-c.connectionErrors:
hasConnectionError = true
c.logger.Info("Received connection error, will retry restoring unhealthy connection", zap.Error(err))
default:
break
}
if hasConnectionError || !c.isConnected() {
if c.isConnected() {
err := c.connection.Close()
if err != nil {
c.logger.Warn("Error closing unhealthy connection", zap.Error(err))
}
}
if err := c.connect(); err != nil {
return errors.Join(errors.New("failed attempt at restoring unhealthy connection"), err)
}
c.logger.Info("Successfully restored unhealthy rabbitmq connection")
}
return nil
}
func (c *connectionHolder) connect() error {
c.logger.Debug("Connecting to rabbitmq")
connection, err := amqp.DialConfig(c.url, c.config)
if connection != nil {
c.connection = connection
}
if err != nil {
return err
}
// Goal is to lazily restore the connection so this needs to be buffered to avoid blocking on asynchronous amqp errors.
// Also re-create this channel each time because apparently the amqp library can close it
c.connectionErrors = make(chan *amqp.Error, 1)
c.connection.NotifyClose(c.connectionErrors)
return nil
}
func (c *connectionHolder) Close() error {
if c.isConnected() {
return c.connection.Close()
}
return nil
}
func (c *connectionHolder) isConnected() bool {
return c.connection != nil && !c.IsClosed()
}
func (c *connectionHolder) Channel() (Channel, error) {
channel, err := c.connection.Channel()
if err != nil {
return nil, err
}
return &channelHolder{channel: channel}, nil
}
func (c *connectionHolder) IsClosed() bool {
return c.connection.IsClosed()
}
func (c *connectionHolder) NotifyClose(receiver chan *amqp.Error) chan *amqp.Error {
return c.connection.NotifyClose(receiver)
}
func (c *channelHolder) Confirm(noWait bool) error {
return c.channel.Confirm(noWait)
}
func (c *channelHolder) PublishWithDeferredConfirmWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg amqp.Publishing) (DeferredConfirmation, error) {
confirmation, err := c.channel.PublishWithDeferredConfirmWithContext(ctx, exchange, key, mandatory, immediate, msg)
if err != nil {
return nil, err
}
return &deferredConfirmationHolder{confirmation: confirmation}, nil
}
func (c *channelHolder) IsClosed() bool {
return c.channel.IsClosed()
}
func (c *channelHolder) Close() error {
return c.channel.Close()
}
func (d *deferredConfirmationHolder) Done() <-chan struct{} {
return d.confirmation.Done()
}
func (d *deferredConfirmationHolder) Acked() bool {
return d.confirmation.Acked()
}