common/persistence/nosql/nosql_queue_store.go (299 lines of code) (raw):

// Copyright (c) 2019 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package nosql import ( "context" "fmt" "github.com/uber/cadence/common/config" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/persistence/nosql/nosqlplugin" "github.com/uber/cadence/common/types" ) const ( emptyMessageID = -1 ) type nosqlQueueStore struct { queueType persistence.QueueType nosqlStore } func newNoSQLQueueStore( cfg config.ShardedNoSQL, logger log.Logger, queueType persistence.QueueType, dc *persistence.DynamicConfiguration, ) (persistence.Queue, error) { shardedStore, err := newShardedNosqlStore(cfg, logger, dc) if err != nil { return nil, err } queue := &nosqlQueueStore{ nosqlStore: shardedStore.GetDefaultShard(), queueType: queueType, } if err := queue.createQueueMetadataEntryIfNotExist(); err != nil { return nil, fmt.Errorf("failed to check and create queue metadata entry: %v", err) } return queue, nil } func (q *nosqlQueueStore) createQueueMetadataEntryIfNotExist() error { queueMetadata, err := q.getQueueMetadata(context.Background(), q.queueType) if err != nil { return err } if queueMetadata == nil { if err := q.insertInitialQueueMetadataRecord(context.Background(), q.queueType); err != nil { return err } } dlqMetadata, err := q.getQueueMetadata(context.Background(), q.getDLQTypeFromQueueType()) if err != nil { return err } if dlqMetadata == nil { return q.insertInitialQueueMetadataRecord(context.Background(), q.getDLQTypeFromQueueType()) } return nil } // Warning: This is not a safe concurrent operation in its current state. // It's only used for domain replication at the moment, but needs a conditional write guard // for concurrent use func (q *nosqlQueueStore) EnqueueMessage( ctx context.Context, messagePayload []byte, ) error { lastMessageID, err := q.getLastMessageID(ctx, q.queueType) if err != nil { return err } ackLevels, err := q.GetAckLevels(ctx) if err != nil { return err } _, err = q.tryEnqueue(ctx, q.queueType, getNextID(ackLevels, lastMessageID), messagePayload) return err } func (q *nosqlQueueStore) EnqueueMessageToDLQ( ctx context.Context, messagePayload []byte, ) error { // Use negative queue type as the dlq type lastMessageID, err := q.getLastMessageID(ctx, q.getDLQTypeFromQueueType()) if err != nil { return err } _, err = q.tryEnqueue(ctx, q.getDLQTypeFromQueueType(), lastMessageID+1, messagePayload) return err } func (q *nosqlQueueStore) tryEnqueue( ctx context.Context, queueType persistence.QueueType, messageID int64, messagePayload []byte, ) (int64, error) { err := q.db.InsertIntoQueue(ctx, &nosqlplugin.QueueMessageRow{ QueueType: queueType, ID: messageID, Payload: messagePayload, }) if err != nil { if _, ok := err.(*nosqlplugin.ConditionFailure); ok { return emptyMessageID, &persistence.ConditionFailedError{Msg: fmt.Sprintf("message ID %v exists in queue", messageID)} } return emptyMessageID, convertCommonErrors(q.db, fmt.Sprintf("EnqueueMessage, Type: %v", queueType), err) } return messageID, nil } func (q *nosqlQueueStore) getLastMessageID( ctx context.Context, queueType persistence.QueueType, ) (int64, error) { msgID, err := q.db.SelectLastEnqueuedMessageID(ctx, queueType) if err != nil { if q.db.IsNotFoundError(err) { return emptyMessageID, nil } return emptyMessageID, convertCommonErrors(q.db, fmt.Sprintf("GetLastMessageID, Type: %v", queueType), err) } return msgID, nil } func (q *nosqlQueueStore) ReadMessages( ctx context.Context, lastMessageID int64, maxCount int, ) ([]*persistence.InternalQueueMessage, error) { messages, err := q.db.SelectMessagesFrom(ctx, q.queueType, lastMessageID, maxCount) if err != nil { return nil, convertCommonErrors(q.db, "ReadMessages", err) } var result []*persistence.InternalQueueMessage for _, msg := range messages { result = append(result, &persistence.InternalQueueMessage{ ID: msg.ID, QueueType: q.queueType, Payload: msg.Payload, }) } return result, nil } func (q *nosqlQueueStore) ReadMessagesFromDLQ( ctx context.Context, firstMessageID int64, lastMessageID int64, pageSize int, pageToken []byte, ) ([]*persistence.InternalQueueMessage, []byte, error) { response, err := q.db.SelectMessagesBetween(ctx, nosqlplugin.SelectMessagesBetweenRequest{ QueueType: q.getDLQTypeFromQueueType(), ExclusiveBeginMessageID: firstMessageID, InclusiveEndMessageID: lastMessageID, PageSize: pageSize, NextPageToken: pageToken, }) if err != nil { return nil, nil, convertCommonErrors(q.db, "ReadMessagesFromDLQ", err) } var result []*persistence.InternalQueueMessage for _, msg := range response.Rows { result = append(result, &persistence.InternalQueueMessage{ ID: msg.ID, QueueType: msg.QueueType, Payload: msg.Payload, }) } return result, response.NextPageToken, nil } func (q *nosqlQueueStore) DeleteMessagesBefore( ctx context.Context, messageID int64, ) error { if err := q.db.DeleteMessagesBefore(ctx, q.queueType, messageID); err != nil { return convertCommonErrors(q.db, "DeleteMessagesBefore", err) } return nil } func (q *nosqlQueueStore) DeleteMessageFromDLQ( ctx context.Context, messageID int64, ) error { // Use negative queue type as the dlq type if err := q.db.DeleteMessage(ctx, q.getDLQTypeFromQueueType(), messageID); err != nil { return convertCommonErrors(q.db, "DeleteMessageFromDLQ", err) } return nil } func (q *nosqlQueueStore) RangeDeleteMessagesFromDLQ( ctx context.Context, firstMessageID int64, lastMessageID int64, ) error { // Use negative queue type as the dlq type if err := q.db.DeleteMessagesInRange(ctx, q.getDLQTypeFromQueueType(), firstMessageID, lastMessageID); err != nil { return convertCommonErrors(q.db, "RangeDeleteMessagesFromDLQ", err) } return nil } func (q *nosqlQueueStore) insertInitialQueueMetadataRecord( ctx context.Context, queueType persistence.QueueType, ) error { version := int64(0) if err := q.db.InsertQueueMetadata(ctx, queueType, version); err != nil { return convertCommonErrors(q.db, fmt.Sprintf("InsertInitialQueueMetadataRecord, Type: %v", queueType), err) } return nil } func (q *nosqlQueueStore) UpdateAckLevel( ctx context.Context, messageID int64, clusterName string, ) error { return q.updateAckLevel(ctx, messageID, clusterName, q.queueType) } func (q *nosqlQueueStore) GetAckLevels( ctx context.Context, ) (map[string]int64, error) { queueMetadata, err := q.getQueueMetadata(ctx, q.queueType) if err != nil { return nil, err } return queueMetadata.ClusterAckLevels, nil } func (q *nosqlQueueStore) UpdateDLQAckLevel( ctx context.Context, messageID int64, clusterName string, ) error { return q.updateAckLevel(ctx, messageID, clusterName, q.getDLQTypeFromQueueType()) } func (q *nosqlQueueStore) GetDLQAckLevels( ctx context.Context, ) (map[string]int64, error) { // Use negative queue type as the dlq type queueMetadata, err := q.getQueueMetadata(ctx, q.getDLQTypeFromQueueType()) if err != nil { return nil, err } return queueMetadata.ClusterAckLevels, nil } func (q *nosqlQueueStore) GetDLQSize( ctx context.Context, ) (int64, error) { size, err := q.db.GetQueueSize(ctx, q.getDLQTypeFromQueueType()) if err != nil { return 0, convertCommonErrors(q.db, "GetDLQSize", err) } return size, err } func (q *nosqlQueueStore) getQueueMetadata( ctx context.Context, queueType persistence.QueueType, ) (*nosqlplugin.QueueMetadataRow, error) { row, err := q.db.SelectQueueMetadata(ctx, queueType) if err != nil { if q.db.IsNotFoundError(err) { return nil, nil } return nil, convertCommonErrors(q.db, "GetQueueMetadata", err) } return row, nil } func (q *nosqlQueueStore) updateQueueMetadata( ctx context.Context, metadata *nosqlplugin.QueueMetadataRow, ) error { err := q.db.UpdateQueueMetadataCas(ctx, *metadata) if err != nil { if _, ok := err.(*nosqlplugin.ConditionFailure); ok { return &types.InternalServiceError{ Message: "UpdateQueueMetadata operation encounter concurrent write.", } } return convertCommonErrors(q.db, "UpdateQueueMetadata", err) } return nil } // DLQ type of is the negative of number of the non-DLQ func (q *nosqlQueueStore) getDLQTypeFromQueueType() persistence.QueueType { return -q.queueType } func (q *nosqlQueueStore) updateAckLevel( ctx context.Context, messageID int64, clusterName string, queueType persistence.QueueType, ) error { queueMetadata, err := q.getQueueMetadata(ctx, queueType) if err != nil { return err } // Ignore possibly delayed message if ackLevel, ok := queueMetadata.ClusterAckLevels[clusterName]; ok && ackLevel >= messageID { return nil } queueMetadata.ClusterAckLevels[clusterName] = messageID queueMetadata.Version++ // Use negative queue type as the dlq type err = q.updateQueueMetadata(ctx, queueMetadata) if err != nil { return err } return nil } // if, for whatever reason, the ack-levels get ahead of the actual messages // then ensure the next ID follows func getNextID(acks map[string]int64, lastMessageID int64) int64 { o := lastMessageID for _, v := range acks { if v > o { o = v } } return o + 1 }