internal/conn/conn.go (84 lines of code) (raw):
// Package conn provides the top level package for all connection types to the ARN service.
package conn
import (
"fmt"
"log/slog"
"sync"
"sync/atomic"
"github.com/Azure/arn-sdk/internal/conn/http"
"github.com/Azure/arn-sdk/internal/conn/storage"
"github.com/Azure/arn-sdk/models"
)
// PromisePool is a pool of promises to use for notifications.
var PromisePool = sync.Pool{
New: func() any {
return make(chan error, 1)
},
}
// Reset provides a REST connection to the ARN service.
type Service struct {
endpoint string
http *http.Client
store *storage.Client
clientErrs chan error
in chan models.Notifications
id atomic.Uint64
log *slog.Logger
}
type Option func(*Service) error
// WithLogger sets the logger on the client. By default it uses slog.Default().
func WithLogger(log *slog.Logger) Option {
return func(c *Service) error {
c.log = log
return nil
}
}
// New creates a new connection to the ARN service.
func New(httpClient *http.Client, store *storage.Client, clientErrs chan error, options ...Option) (*Service, error) {
if httpClient == nil {
return nil, fmt.Errorf("httpClient is required")
}
if store == nil {
return nil, fmt.Errorf("store is required")
}
if clientErrs == nil {
return nil, fmt.Errorf("clientErrs is required")
}
conn := &Service{
in: make(chan models.Notifications, 1),
http: httpClient,
store: store,
clientErrs: clientErrs,
}
for _, o := range options {
if err := o(conn); err != nil {
return nil, err
}
}
go conn.sender()
return conn, nil
}
// Close closes the connection to the ARN service.
func (r *Service) Close() error {
close(r.in)
return nil
}
// Send sends a notification to the ARN service. This will block if the internal channel is full.
// notify.DataCount() must indicate no more than 1000 items. Not thread safe.
func (s *Service) Send(notify models.Notifications) {
if notify.DataCount() > 1000 {
notify.SendPromise(models.ErrBatchSize, s.clientErrs)
return
}
// Makes this predictable for testing, as select is non-deterministic.
if notify.Ctx().Err() != nil {
notify.SendPromise(notify.Ctx().Err(), s.clientErrs)
return
}
select {
case <-notify.Ctx().Done():
notify.SendPromise(notify.Ctx().Err(), s.clientErrs)
case s.in <- notify:
}
return
}
// sender sends notifications to the ARN service.
func (s *Service) sender() {
for n := range s.in {
if err := n.SendEvent(s.http, s.store); err != nil {
n.SendPromise(err, s.clientErrs)
continue
}
n.SendPromise(nil, s.clientErrs)
}
}