pkg/database/subscriptions.go (132 lines of code) (raw):
package database
// Copyright (c) Microsoft Corporation.
// Licensed under the Apache License 2.0.
import (
"context"
"fmt"
"net/http"
"strings"
"github.com/Azure/ARO-RP/pkg/api"
"github.com/Azure/ARO-RP/pkg/database/cosmosdb"
"github.com/Azure/ARO-RP/pkg/util/uuid"
)
const (
SubscriptionsDequeueQuery string = `SELECT * FROM Subscriptions doc WHERE (doc.deleting ?? false) AND (doc.leaseExpires ?? 0) < GetCurrentTimestamp() / 1000`
)
type subscriptions struct {
c cosmosdb.SubscriptionDocumentClient
uuid string
}
// Subscriptions is the database interface for SubscriptionDocuments
type Subscriptions interface {
Create(context.Context, *api.SubscriptionDocument) (*api.SubscriptionDocument, error)
Get(context.Context, string) (*api.SubscriptionDocument, error)
Update(context.Context, *api.SubscriptionDocument) (*api.SubscriptionDocument, error)
ChangeFeed() cosmosdb.SubscriptionDocumentIterator
Dequeue(context.Context) (*api.SubscriptionDocument, error)
Lease(context.Context, string) (*api.SubscriptionDocument, error)
EndLease(context.Context, string, bool, bool) (*api.SubscriptionDocument, error)
}
// NewSubscriptions returns a new Subscriptions
func NewSubscriptions(ctx context.Context, dbc cosmosdb.DatabaseClient, dbName string) (Subscriptions, error) {
collc := cosmosdb.NewCollectionClient(dbc, dbName)
documentClient := cosmosdb.NewSubscriptionDocumentClient(collc, collSubscriptions)
return NewSubscriptionsWithProvidedClient(documentClient, uuid.DefaultGenerator.Generate()), nil
}
func NewSubscriptionsWithProvidedClient(client cosmosdb.SubscriptionDocumentClient, uuid string) Subscriptions {
return &subscriptions{
c: client,
uuid: uuid,
}
}
func (c *subscriptions) Create(ctx context.Context, doc *api.SubscriptionDocument) (*api.SubscriptionDocument, error) {
if doc.ID != strings.ToLower(doc.ID) {
return nil, fmt.Errorf("id %q is not lower case", doc.ID)
}
doc, err := c.c.Create(ctx, doc.ID, doc, nil)
if err, ok := err.(*cosmosdb.Error); ok && err.StatusCode == http.StatusConflict {
err.StatusCode = http.StatusPreconditionFailed
}
return doc, err
}
func (c *subscriptions) Get(ctx context.Context, id string) (*api.SubscriptionDocument, error) {
if id != strings.ToLower(id) {
return nil, fmt.Errorf("id %q is not lower case", id)
}
return c.c.Get(ctx, id, id, nil)
}
func (c *subscriptions) patch(ctx context.Context, id string, f func(*api.SubscriptionDocument) error, options *cosmosdb.Options) (*api.SubscriptionDocument, error) {
var doc *api.SubscriptionDocument
err := cosmosdb.RetryOnPreconditionFailed(func() (err error) {
doc, err = c.Get(ctx, id)
if err != nil {
return
}
err = f(doc)
if err != nil {
return
}
doc, err = c.update(ctx, doc, options)
return
})
return doc, err
}
func (c *subscriptions) patchWithLease(ctx context.Context, key string, f func(*api.SubscriptionDocument) error, options *cosmosdb.Options) (*api.SubscriptionDocument, error) {
return c.patch(ctx, key, func(doc *api.SubscriptionDocument) error {
if doc.LeaseOwner != c.uuid {
return fmt.Errorf("lost lease")
}
return f(doc)
}, options)
}
func (c *subscriptions) Update(ctx context.Context, doc *api.SubscriptionDocument) (*api.SubscriptionDocument, error) {
return c.update(ctx, doc, nil)
}
func (c *subscriptions) update(ctx context.Context, doc *api.SubscriptionDocument, options *cosmosdb.Options) (*api.SubscriptionDocument, error) {
if doc.ID != strings.ToLower(doc.ID) {
return nil, fmt.Errorf("id %q is not lower case", doc.ID)
}
return c.c.Replace(ctx, doc.ID, doc, options)
}
func (c *subscriptions) ChangeFeed() cosmosdb.SubscriptionDocumentIterator {
return c.c.ChangeFeed(nil)
}
func (c *subscriptions) Dequeue(ctx context.Context) (*api.SubscriptionDocument, error) {
i := c.c.Query("", &cosmosdb.Query{Query: SubscriptionsDequeueQuery}, nil)
for {
docs, err := i.Next(ctx, -1)
if err != nil {
return nil, err
}
if docs == nil {
return nil, nil
}
for _, doc := range docs.SubscriptionDocuments {
doc.LeaseOwner = c.uuid
doc.Dequeues++
doc, err = c.update(ctx, doc, &cosmosdb.Options{PreTriggers: []string{"renewLease"}})
if cosmosdb.IsErrorStatusCode(err, http.StatusPreconditionFailed) { // someone else got there first
continue
}
return doc, err
}
}
}
func (c *subscriptions) Lease(ctx context.Context, id string) (*api.SubscriptionDocument, error) {
return c.patchWithLease(ctx, id, func(doc *api.SubscriptionDocument) error {
return nil
}, &cosmosdb.Options{PreTriggers: []string{"renewLease"}})
}
func (c *subscriptions) EndLease(ctx context.Context, id string, done, retryLater bool) (*api.SubscriptionDocument, error) {
var options *cosmosdb.Options
if retryLater {
options = &cosmosdb.Options{PreTriggers: []string{"retryLater"}}
}
return c.patchWithLease(ctx, id, func(doc *api.SubscriptionDocument) error {
if done {
doc.Deleting = false
}
doc.LeaseOwner = ""
doc.LeaseExpires = 0
if done || retryLater {
doc.Dequeues = 0
}
return nil
}, options)
}