pkg/database/openshiftclusters.go (299 lines of code) (raw):
package database
// Copyright (c) Microsoft Corporation.
// Licensed under the Apache License 2.0.
import (
"context"
"fmt"
"net/http"
"strings"
"github.com/Azure/go-autorest/autorest/azure"
"github.com/Azure/ARO-RP/pkg/api"
"github.com/Azure/ARO-RP/pkg/database/cosmosdb"
"github.com/Azure/ARO-RP/pkg/util/uuid"
)
const (
OpenShiftClustersDequeueQuery = `SELECT * FROM OpenShiftClusters doc WHERE doc.openShiftCluster.properties.provisioningState IN ("Creating", "Deleting", "Updating", "AdminUpdating") AND (doc.leaseExpires ?? 0) < GetCurrentTimestamp() / 1000`
OpenShiftClustersQueueLengthQuery = `SELECT VALUE COUNT(1) FROM OpenShiftClusters doc WHERE doc.openShiftCluster.properties.provisioningState IN ("Creating", "Deleting", "Updating", "AdminUpdating") AND (doc.leaseExpires ?? 0) < GetCurrentTimestamp() / 1000`
OpenShiftClustersGetQuery = `SELECT * FROM OpenShiftClusters doc WHERE doc.key = @key`
OpenshiftClustersPrefixQuery = `SELECT * FROM OpenShiftClusters doc WHERE STARTSWITH(doc.key, @prefix)`
OpenshiftClustersClientIdQuery = `SELECT * FROM OpenShiftClusters doc WHERE doc.clientIdKey = @clientID`
OpenshiftClustersResourceGroupQuery = `SELECT * FROM OpenShiftClusters doc WHERE doc.clusterResourceGroupIdKey = @resourceGroupID`
OpenshiftClustersClusterResourceIDOnlyQuery = `SELECT doc.id, doc.key FROM OpenShiftClusters doc WHERE doc.openShiftCluster.properties.provisioningState NOT IN ("Creating", "Deleting")`
)
type OpenShiftClusterDocumentMutator func(*api.OpenShiftClusterDocument) error
type openShiftClusters struct {
c cosmosdb.OpenShiftClusterDocumentClient
collc cosmosdb.CollectionClient
uuid string
uuidGenerator uuid.Generator
}
// OpenShiftClusters is the database interface for OpenShiftClusterDocuments
type OpenShiftClusters interface {
Create(context.Context, *api.OpenShiftClusterDocument) (*api.OpenShiftClusterDocument, error)
Get(context.Context, string) (*api.OpenShiftClusterDocument, error)
QueueLength(context.Context, string) (int, error)
Patch(context.Context, string, OpenShiftClusterDocumentMutator) (*api.OpenShiftClusterDocument, error)
PatchWithLease(context.Context, string, OpenShiftClusterDocumentMutator) (*api.OpenShiftClusterDocument, error)
Update(context.Context, *api.OpenShiftClusterDocument) (*api.OpenShiftClusterDocument, error)
Delete(context.Context, *api.OpenShiftClusterDocument) error
ChangeFeed() cosmosdb.OpenShiftClusterDocumentIterator
List(string) cosmosdb.OpenShiftClusterDocumentIterator
ListAll(context.Context) (*api.OpenShiftClusterDocuments, error)
ListByPrefix(string, string, string) (cosmosdb.OpenShiftClusterDocumentIterator, error)
Dequeue(context.Context) (*api.OpenShiftClusterDocument, error)
Lease(context.Context, string) (*api.OpenShiftClusterDocument, error)
EndLease(context.Context, string, api.ProvisioningState, api.ProvisioningState, *string) (*api.OpenShiftClusterDocument, error)
GetByClientID(ctx context.Context, partitionKey, clientID string) (*api.OpenShiftClusterDocuments, error)
GetByClusterResourceGroupID(ctx context.Context, partitionKey, resourceGroupID string) (*api.OpenShiftClusterDocuments, error)
GetAllResourceIDs(ctx context.Context, continuation string) (cosmosdb.OpenShiftClusterDocumentIterator, error)
DoDequeue(ctx context.Context, doc *api.OpenShiftClusterDocument) (*api.OpenShiftClusterDocument, error)
NewUUID() string
}
// NewOpenShiftClusters returns a new OpenShiftClusters
func NewOpenShiftClusters(ctx context.Context, dbc cosmosdb.DatabaseClient, dbName string) (OpenShiftClusters, error) {
collc := cosmosdb.NewCollectionClient(dbc, dbName)
documentClient := cosmosdb.NewOpenShiftClusterDocumentClient(collc, collOpenShiftClusters)
return NewOpenShiftClustersWithProvidedClient(documentClient, collc, uuid.DefaultGenerator.Generate(), uuid.DefaultGenerator), nil
}
func NewOpenShiftClustersWithProvidedClient(client cosmosdb.OpenShiftClusterDocumentClient, collectionClient cosmosdb.CollectionClient, uuid string, uuidGenerator uuid.Generator) OpenShiftClusters {
return &openShiftClusters{
c: client,
collc: collectionClient,
uuid: uuid,
uuidGenerator: uuidGenerator,
}
}
func (c *openShiftClusters) NewUUID() string {
return c.uuidGenerator.Generate()
}
func (c *openShiftClusters) Create(ctx context.Context, doc *api.OpenShiftClusterDocument) (*api.OpenShiftClusterDocument, error) {
if doc.Key != strings.ToLower(doc.Key) {
return nil, fmt.Errorf("key %q is not lower case", doc.Key)
}
var err error
doc.PartitionKey, err = c.partitionKey(doc.Key)
if err != nil {
return nil, err
}
doc, err = c.c.Create(ctx, doc.PartitionKey, doc, nil)
if err, ok := err.(*cosmosdb.Error); ok && err.StatusCode == http.StatusConflict {
err.StatusCode = http.StatusPreconditionFailed
}
return doc, err
}
func (c *openShiftClusters) Get(ctx context.Context, key string) (*api.OpenShiftClusterDocument, error) {
if key != strings.ToLower(key) {
return nil, fmt.Errorf("key %q is not lower case", key)
}
partitionKey, err := c.partitionKey(key)
if err != nil {
return nil, err
}
docs, err := c.c.QueryAll(ctx, partitionKey, &cosmosdb.Query{
Query: OpenShiftClustersGetQuery,
Parameters: []cosmosdb.Parameter{
{
Name: "@key",
Value: key,
},
},
}, nil)
if err != nil {
return nil, err
}
switch {
case len(docs.OpenShiftClusterDocuments) > 1:
return nil, fmt.Errorf("read %d documents, expected <= 1", len(docs.OpenShiftClusterDocuments))
case len(docs.OpenShiftClusterDocuments) == 1:
return docs.OpenShiftClusterDocuments[0], nil
default:
return nil, &cosmosdb.Error{StatusCode: http.StatusNotFound}
}
}
// QueueLength returns OpenShiftClusters un-queued document count.
// If error occurs, 0 is returned with error message
func (c *openShiftClusters) QueueLength(ctx context.Context, collid string) (int, error) {
partitions, err := c.collc.PartitionKeyRanges(ctx, collid)
if err != nil {
return 0, err
}
var countTotal int
for _, r := range partitions.PartitionKeyRanges {
result := c.c.Query("", &cosmosdb.Query{
Query: OpenShiftClustersQueueLengthQuery,
}, &cosmosdb.Options{
PartitionKeyRangeID: r.ID,
})
// because we aggregate count we don't expect pagination in this query result,
// so we gonna call Next() only once per partition.
var data struct {
api.MissingFields
Document []int `json:"Documents,omitempty"`
}
err := result.NextRaw(ctx, -1, &data)
if err != nil {
return 0, err
}
countTotal = countTotal + data.Document[0]
}
return countTotal, nil
}
func (c *openShiftClusters) Patch(ctx context.Context, key string, f OpenShiftClusterDocumentMutator) (*api.OpenShiftClusterDocument, error) {
return c.patch(ctx, key, f, nil)
}
func (c *openShiftClusters) patch(ctx context.Context, key string, f OpenShiftClusterDocumentMutator, options *cosmosdb.Options) (*api.OpenShiftClusterDocument, error) {
var doc *api.OpenShiftClusterDocument
err := cosmosdb.RetryOnPreconditionFailed(func() (err error) {
doc, err = c.Get(ctx, key)
if err != nil {
return
}
err = f(doc)
if err != nil {
return
}
doc, err = c.update(ctx, doc, options)
return
})
return doc, err
}
func (c *openShiftClusters) PatchWithLease(ctx context.Context, key string, f OpenShiftClusterDocumentMutator) (*api.OpenShiftClusterDocument, error) {
return c.patchWithLease(ctx, key, f, nil)
}
func (c *openShiftClusters) patchWithLease(ctx context.Context, key string, f OpenShiftClusterDocumentMutator, options *cosmosdb.Options) (*api.OpenShiftClusterDocument, error) {
return c.patch(ctx, key, func(doc *api.OpenShiftClusterDocument) error {
if doc.LeaseOwner != c.uuid {
return fmt.Errorf("lost lease")
}
return f(doc)
}, options)
}
func (c *openShiftClusters) Update(ctx context.Context, doc *api.OpenShiftClusterDocument) (*api.OpenShiftClusterDocument, error) {
return c.update(ctx, doc, nil)
}
func (c *openShiftClusters) update(ctx context.Context, doc *api.OpenShiftClusterDocument, options *cosmosdb.Options) (*api.OpenShiftClusterDocument, error) {
if doc.Key != strings.ToLower(doc.Key) {
return nil, fmt.Errorf("key %q is not lower case", doc.Key)
}
return c.c.Replace(ctx, doc.PartitionKey, doc, options)
}
func (c *openShiftClusters) Delete(ctx context.Context, doc *api.OpenShiftClusterDocument) error {
if doc.Key != strings.ToLower(doc.Key) {
return fmt.Errorf("key %q is not lower case", doc.Key)
}
return c.c.Delete(ctx, doc.PartitionKey, doc, &cosmosdb.Options{NoETag: true})
}
func (c *openShiftClusters) ChangeFeed() cosmosdb.OpenShiftClusterDocumentIterator {
return c.c.ChangeFeed(nil)
}
func (c *openShiftClusters) List(continuation string) cosmosdb.OpenShiftClusterDocumentIterator {
return c.c.List(&cosmosdb.Options{Continuation: continuation})
}
func (c *openShiftClusters) ListAll(ctx context.Context) (*api.OpenShiftClusterDocuments, error) {
return c.c.ListAll(ctx, nil)
}
func (c *openShiftClusters) ListByPrefix(subscriptionID, prefix, continuation string) (cosmosdb.OpenShiftClusterDocumentIterator, error) {
if prefix != strings.ToLower(prefix) {
return nil, fmt.Errorf("prefix %q is not lower case", prefix)
}
return c.c.Query(
subscriptionID,
&cosmosdb.Query{
Query: OpenshiftClustersPrefixQuery,
Parameters: []cosmosdb.Parameter{
{
Name: "@prefix",
Value: prefix,
},
},
},
&cosmosdb.Options{Continuation: continuation},
), nil
}
func (c *openShiftClusters) Dequeue(ctx context.Context) (*api.OpenShiftClusterDocument, error) {
i := c.c.Query("", &cosmosdb.Query{
Query: OpenShiftClustersDequeueQuery,
}, nil)
for {
docs, err := i.Next(ctx, -1)
if err != nil {
return nil, err
}
if docs == nil {
return nil, nil
}
for _, doc := range docs.OpenShiftClusterDocuments {
doc, err = c.DoDequeue(ctx, doc)
if cosmosdb.IsErrorStatusCode(err, http.StatusPreconditionFailed) { // someone else got there first
continue
}
return doc, err
}
}
}
func (c *openShiftClusters) DoDequeue(ctx context.Context, doc *api.OpenShiftClusterDocument) (*api.OpenShiftClusterDocument, error) {
doc.LeaseOwner = c.uuid
doc.Dequeues++
return c.update(ctx, doc, &cosmosdb.Options{PreTriggers: []string{"renewLease"}})
}
func (c *openShiftClusters) Lease(ctx context.Context, key string) (*api.OpenShiftClusterDocument, error) {
return c.patchWithLease(ctx, key, func(doc *api.OpenShiftClusterDocument) error {
return nil
}, &cosmosdb.Options{PreTriggers: []string{"renewLease"}})
}
func (c *openShiftClusters) EndLease(ctx context.Context, key string, provisioningState, failedProvisioningState api.ProvisioningState, adminUpdateError *string) (*api.OpenShiftClusterDocument, error) {
return c.patchWithLease(ctx, key, func(doc *api.OpenShiftClusterDocument) error {
doc.OpenShiftCluster.Properties.ProvisioningState = provisioningState
doc.OpenShiftCluster.Properties.FailedProvisioningState = failedProvisioningState
doc.OpenShiftCluster.Properties.MaintenanceTask = ""
doc.LeaseOwner = ""
doc.LeaseExpires = 0
if provisioningState != api.ProvisioningStateFailed {
doc.Dequeues = 0
}
// If EndLease is called while cluster is still in terminal phase,
// we clean AsyncOperationID. Otherwise it just handover between backends.
if provisioningState.IsTerminal() {
if adminUpdateError != nil {
doc.OpenShiftCluster.Properties.LastAdminUpdateError = *adminUpdateError
}
doc.CorrelationData = nil
doc.OpenShiftCluster.Properties.LastProvisioningState = ""
doc.AsyncOperationID = ""
}
return nil
}, nil)
}
func (c *openShiftClusters) partitionKey(key string) (string, error) {
r, err := azure.ParseResourceID(key)
return r.SubscriptionID, err
}
func (c *openShiftClusters) GetByClientID(ctx context.Context, partitionKey, clientID string) (*api.OpenShiftClusterDocuments, error) {
docs, err := c.c.QueryAll(ctx, partitionKey, &cosmosdb.Query{
Query: OpenshiftClustersClientIdQuery,
Parameters: []cosmosdb.Parameter{
{
Name: "@clientID",
Value: clientID,
},
},
}, nil)
if err != nil {
return nil, err
}
return docs, nil
}
func (c *openShiftClusters) GetByClusterResourceGroupID(ctx context.Context, partitionKey, resourceGroupID string) (*api.OpenShiftClusterDocuments, error) {
docs, err := c.c.QueryAll(ctx, partitionKey, &cosmosdb.Query{
Query: OpenshiftClustersResourceGroupQuery,
Parameters: []cosmosdb.Parameter{
{
Name: "@resourceGroupID",
Value: resourceGroupID,
},
},
}, nil)
if err != nil {
return nil, err
}
return docs, nil
}
func (c *openShiftClusters) GetAllResourceIDs(ctx context.Context, continuation string) (cosmosdb.OpenShiftClusterDocumentIterator, error) {
return c.c.Query(
"",
&cosmosdb.Query{
Query: OpenshiftClustersClusterResourceIDOnlyQuery,
},
&cosmosdb.Options{Continuation: continuation},
), nil
}