models/v3/msgs/msgs.go (243 lines of code) (raw):
package msgs
import (
"context"
"errors"
"fmt"
"log/slog"
"math"
"net/url"
"sync"
"time"
"github.com/Azure/arn-sdk/internal/conn"
"github.com/Azure/arn-sdk/internal/conn/http"
"github.com/Azure/arn-sdk/internal/conn/maxvals"
"github.com/Azure/arn-sdk/internal/conn/storage"
"github.com/Azure/arn-sdk/models"
"github.com/Azure/arn-sdk/models/metrics"
"github.com/Azure/arn-sdk/models/v3/schema/envelope"
"github.com/Azure/arn-sdk/models/v3/schema/types"
"github.com/Azure/arn-sdk/models/version"
"github.com/go-json-experiment/json"
"github.com/google/uuid"
)
// Compile time check to ensure Notifications implements models.Notifications.
var _ models.Notifications = Notifications{}
// Notifications is a notification to send to the ARN service. This is a wrapper around the actual data
// that is sent in the notification described in types.Data. The data will be converted to an Event and
// sent over the wire.
type Notifications struct {
// ctx is the context for the notification. This honors the context deadline.
ctx context.Context
// Promise is a channel that will be used to send the result of the notification.
// If this is nil, no promise will be sent unless callling Notify(). In that case
// a promise will be created automatically. A promise is only good until you receive
// the result from it. After that, the promise can be reused in another Notification.
// This is not required to be set if you are using Notify().
promise chan error
// ResourceLocation is the location of the resources in this notification. This is the normalized ARM location enum
// like "eastus".
ResourceLocation string
// FrontdoorLocation is the ARM region that emitted the notification. Omitted for notifications not emitted by ARM.
FrontdoorLocation string
// PublisherInfo is the Namespace of the publisher sending the data of this notification, for example Microsoft.Resources is be the publisherInfo for ARM.
PublisherInfo string
// AdditionalBatchProperties can contain the sdkversion, batchsize, subscription partition tag etc.
AdditionalBatchProperties types.AdditionalBatchProperties
// Data is the data to send in the notification.
Data []types.NotificationResource
testSendHTTP func(*http.Client, envelope.Event) error
testSendBlob func(*storage.Client, []byte) (*url.URL, error)
}
// Promise waits for the promise to be fulfilled. This will return an ErrPromiseTimeout if the context
// passed times out to distiguish it from a context timeout on sending the notification.
func (n Notifications) Promise(ctx context.Context) error {
if n.promise == nil {
return nil
}
defer func() {
conn.PromisePool.Put(n.promise)
}()
if ctx.Err() != nil {
metrics.Promise(context.Background(), ctx.Err())
return ctx.Err()
}
select {
case <-ctx.Done():
metrics.Promise(context.Background(), models.ErrPromiseTimeout)
return models.ErrPromiseTimeout
case e := <-n.promise:
metrics.Promise(context.Background(), e)
return e
}
}
// Recycle can be used to recycle the promise of a notification once it has been used.
// This is for internal use and should not be called.
// It is a terrible idea to use the promise after it has been recycled.
func (n Notifications) Recycle() {
if n.promise != nil {
select {
case <-n.promise:
default:
}
conn.PromisePool.Put(n.promise)
}
}
func (n Notifications) Ctx() context.Context {
return n.ctx
}
// DataCount implements models.Notifications.DataCount().
func (n Notifications) DataCount() int {
return len(n.Data)
}
// DataJSON implements models.Notifications.Version().
func (n Notifications) Version() version.Schema {
return version.V3
}
// GetPublisherInfo implements models.Notifications.GetPublisherInfo().
func (n Notifications) GetPublisherInfo() string {
return n.PublisherInfo
}
// SetCtx implements models.Notifications.SetCtx().
func (n Notifications) SetCtx(ctx context.Context) models.Notifications {
n.ctx = ctx
return n
}
// SetPromise sets the promise channel used for the notification.
func (n Notifications) SetPromise(promise chan error) models.Notifications {
n.promise = promise
return n
}
// SendPromise sends an error on the promise to the notification.
func (n Notifications) SendPromise(e error, backupCh chan error) {
if n.promise == nil {
if e == nil {
return
}
if backupCh != nil {
select {
case backupCh <- e:
default:
}
}
return
}
select {
case n.promise <- e:
default:
slog.Default().Error("Bug: had a Notification promise, but it blocked")
}
}
func EventType(res types.NotificationResource) string {
return fmt.Sprintf("%s/%s", res.ArmResource.Type, res.ArmResource.Activity().String())
}
// dataToJSON returns the JSON representation of the data in the notification.
// Once this is called, the data is cached. So new data added to the Notification will not be included in the JSON.
func (n Notifications) dataToJSON() ([]byte, error) {
b, err := json.Marshal(n.Data)
if err != nil {
return nil, err
}
return b, nil
}
// SendEvent converts the notification to an event and sends it to the ARN service.
// Do not call this function directly, use methods on the Client instead.
func (n Notifications) SendEvent(hc *http.Client, store *storage.Client) (err error) {
started := time.Now()
// keep track so we can record whether the data was inlined or not (receiver or blob)
inline := false
var dataSize int64
defer func() {
elapsed := time.Since(started)
if err != nil {
metrics.SendEventFailure(context.Background(), elapsed, inline, dataSize)
return
}
metrics.SendEventSuccess(context.Background(), elapsed, inline, dataSize)
}()
if len(n.Data) == 0 {
return errors.New("no data to send")
}
// Convert the notification to an event.
dataJSON, event, err := n.toEvent()
if err != nil {
return err
}
// As a producer, we have to set the status code for all Resources to OK.
for i, e := range event.Data.Resources {
e.StatusCode = types.StatusCode
event.Data.Resources[i] = e
}
if err = event.Validate(); err != nil {
return err
}
dataSize = int64(len(event.Data.Data))
// If the data is marked inline, we can send over HTTP directly.
if event.Data.ResourcesContainer == types.RCInline {
inline = true
return n.sendHTTP(hc, event)
}
u, err := n.sendBlob(store, dataJSON)
if err != nil {
return err
}
// Tell the service (via HTTP) where to find the blob.
event.Data.ResourcesBlobInfo.BlobURI = u.String()
event.Data.ResourcesBlobInfo.BlobSize = int64(len(dataJSON))
return n.sendHTTP(hc, event)
}
// toEvent converts the notification to an event. If the data is inline, the data will be included in the event.
// Otherwise you will need to set Event.Data.ResourceBlobInfo.BlobURI to the URI of the blob.
func (n Notifications) toEvent() ([]byte, envelope.Event, error) {
dataJSON, inline, err := n.inline()
if err != nil {
return dataJSON, envelope.Event{}, err
}
meta, err := newEventMeta(n.Data)
if err != nil {
return dataJSON, envelope.Event{}, fmt.Errorf("problem creating an EventMeta: %w", err)
}
if len(n.Data) > math.MaxUint16 {
return dataJSON, envelope.Event{}, fmt.Errorf("too many resources to send in a single event: %d", len(n.Data))
}
n.AdditionalBatchProperties.BatchSize = uint16(len(n.Data))
n.AdditionalBatchProperties.SDKVersion = version.SDK.AsARNFormat()
if inline {
return dataJSON, envelope.Event{
EventMeta: meta,
Data: types.Data{
Data: dataJSON, // This serializes into the "Resources" field.
FrontdoorLocation: n.FrontdoorLocation,
AdditionalBatchProperties: n.AdditionalBatchProperties,
ResourcesContainer: types.RCInline,
ResourceLocation: n.ResourceLocation,
PublisherInfo: n.PublisherInfo,
Resources: n.Data, // This doesn't serialize into JSON, only the "Data" field does, which actually replaces this field.
},
}, nil
}
return dataJSON, envelope.Event{
EventMeta: meta,
Data: types.Data{
FrontdoorLocation: n.FrontdoorLocation,
AdditionalBatchProperties: n.AdditionalBatchProperties,
ResourcesContainer: types.RCBlob,
ResourceLocation: n.ResourceLocation,
PublisherInfo: n.PublisherInfo,
Resources: n.Data, // This doesn't serialize into JSON, only the "Data" field does, which actually replaces this field.
},
}, nil
}
var headerPool = sync.Pool{
New: func() any {
return make([]string, 2)
},
}
func (n Notifications) sendHTTP(hc *http.Client, event envelope.Event) error {
if n.testSendHTTP != nil {
return n.testSendHTTP(hc, event)
}
b, err := json.Marshal(event)
if err != nil {
return err
}
headers := headerPool.Get().([]string)
headers[0] = "publisherinfo"
headers[1] = event.Data.PublisherInfo
defer headerPool.Put(headers)
return hc.Send(n.ctx, b, headers)
}
func (n Notifications) sendBlob(store *storage.Client, dataJSON []byte) (*url.URL, error) {
if n.testSendBlob != nil {
return n.testSendBlob(store, dataJSON)
}
// If store isn't set then this message is too large to send.
if store == nil {
return nil, fmt.Errorf("event exceeds max inline size and no storage client provided to store the data in a blob")
}
return store.Upload(n.ctx, uuid.New().String(), dataJSON)
}
// inline determines if the notification should be inlined. It returns the JSON representation of the data
// so that we don't have to marshal it again, if the data should be inlined and an error if there was a problem.
func (n Notifications) inline() ([]byte, bool, error) {
b, err := n.dataToJSON()
if err != nil {
return nil, false, err
}
if len(b) < maxvals.InlineSize {
return b, true, nil
}
return b, false, nil
}
var nower = time.Now
// newEventMeta creates a new EventMeta. This is not intended to be used by
// a caller, so this constructor is here instead of in the types package.
func newEventMeta(data []types.NotificationResource) (envelope.EventMeta, error) {
if len(data) == 0 {
return envelope.EventMeta{}, errors.New("data must not be empty")
}
return envelope.EventMeta{
ID: uuid.New().String(),
Subject: subject(data),
DataVersion: version.V3,
MetadataVersion: "1.0",
EventTime: nower().UTC(),
EventType: EventType(data[0]),
}, nil
}