common/persistence/nosql/nosql_task_store.go (342 lines of code) (raw):
// Copyright (c) 2020 Uber Technologies, Inc.
// Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package nosql
import (
"context"
"fmt"
"math"
"time"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/persistence/nosql/nosqlplugin"
"github.com/uber/cadence/common/types"
)
type (
nosqlTaskStore struct {
*shardedNosqlStore
}
)
const (
initialRangeID = 1 // Id of the first range of a new task list
initialAckLevel = 0
stickyTaskListTTL = int64(24 * time.Hour / time.Second) // if sticky task_list stopped being updated, remove it in one day
)
// newNoSQLTaskStore is used to create an instance of TaskStore implementation
func newNoSQLTaskStore(
cfg config.ShardedNoSQL,
logger log.Logger,
dc *persistence.DynamicConfiguration,
) (persistence.TaskStore, error) {
s, err := newShardedNosqlStore(cfg, logger, dc)
if err != nil {
return nil, err
}
return &nosqlTaskStore{
shardedNosqlStore: s,
}, nil
}
func (t *nosqlTaskStore) GetOrphanTasks(ctx context.Context, request *persistence.GetOrphanTasksRequest) (*persistence.GetOrphanTasksResponse, error) {
// TODO: It's unclear if this's necessary or possible for NoSQL
return nil, &types.InternalServiceError{
Message: "Unimplemented call to GetOrphanTasks for NoSQL",
}
}
func (t *nosqlTaskStore) GetTaskListSize(ctx context.Context, request *persistence.GetTaskListSizeRequest) (*persistence.GetTaskListSizeResponse, error) {
storeShard, err := t.GetStoreShardByTaskList(request.DomainID, request.TaskListName, request.TaskListType)
if err != nil {
return nil, err
}
size, err := storeShard.db.GetTasksCount(ctx, &nosqlplugin.TasksFilter{
TaskListFilter: nosqlplugin.TaskListFilter{
DomainID: request.DomainID,
TaskListName: request.TaskListName,
TaskListType: request.TaskListType,
},
MinTaskID: request.AckLevel,
})
if err != nil {
return nil, err
}
return &persistence.GetTaskListSizeResponse{Size: size}, nil
}
func (t *nosqlTaskStore) LeaseTaskList(
ctx context.Context,
request *persistence.LeaseTaskListRequest,
) (*persistence.LeaseTaskListResponse, error) {
if len(request.TaskList) == 0 {
return nil, &types.InternalServiceError{
Message: "LeaseTaskList requires non empty task list",
}
}
now := time.Now()
var err, selectErr error
var currTL *nosqlplugin.TaskListRow
storeShard, err := t.GetStoreShardByTaskList(request.DomainID, request.TaskList, request.TaskType)
if err != nil {
return nil, err
}
currTL, selectErr = storeShard.db.SelectTaskList(ctx, &nosqlplugin.TaskListFilter{
DomainID: request.DomainID,
TaskListName: request.TaskList,
TaskListType: request.TaskType,
})
if selectErr != nil {
if storeShard.db.IsNotFoundError(selectErr) { // First time task list is used
currTL = &nosqlplugin.TaskListRow{
DomainID: request.DomainID,
TaskListName: request.TaskList,
TaskListType: request.TaskType,
RangeID: initialRangeID,
TaskListKind: request.TaskListKind,
AckLevel: initialAckLevel,
LastUpdatedTime: now,
}
err = storeShard.db.InsertTaskList(ctx, currTL)
} else {
return nil, convertCommonErrors(storeShard.db, "LeaseTaskList", err)
}
} else {
// if request.RangeID is > 0, we are trying to renew an already existing
// lease on the task list. If request.RangeID=0, we are trying to steal
// the tasklist from its current owner
if request.RangeID > 0 && request.RangeID != currTL.RangeID {
return nil, &persistence.ConditionFailedError{
Msg: fmt.Sprintf("leaseTaskList:renew failed: taskList:%v, taskListType:%v, haveRangeID:%v, gotRangeID:%v",
request.TaskList, request.TaskType, request.RangeID, currTL.RangeID),
}
}
// Update the rangeID as this is an ownership change
currTL.RangeID++
err = storeShard.db.UpdateTaskList(ctx, &nosqlplugin.TaskListRow{
DomainID: request.DomainID,
TaskListName: request.TaskList,
TaskListType: request.TaskType,
RangeID: currTL.RangeID,
TaskListKind: currTL.TaskListKind,
AckLevel: currTL.AckLevel,
LastUpdatedTime: now,
}, currTL.RangeID-1)
}
if err != nil {
conditionFailure, ok := err.(*nosqlplugin.TaskOperationConditionFailure)
if ok {
return nil, &persistence.ConditionFailedError{
Msg: fmt.Sprintf("leaseTaskList: taskList:%v, taskListType:%v, haveRangeID:%v, gotRangeID:%v",
request.TaskList, request.TaskType, currTL.RangeID, conditionFailure.RangeID),
}
}
return nil, convertCommonErrors(storeShard.db, "LeaseTaskList", err)
}
tli := &persistence.TaskListInfo{
DomainID: request.DomainID,
Name: request.TaskList,
TaskType: request.TaskType,
RangeID: currTL.RangeID,
AckLevel: currTL.AckLevel,
Kind: request.TaskListKind,
LastUpdated: now,
}
return &persistence.LeaseTaskListResponse{TaskListInfo: tli}, nil
}
func (t *nosqlTaskStore) UpdateTaskList(
ctx context.Context,
request *persistence.UpdateTaskListRequest,
) (*persistence.UpdateTaskListResponse, error) {
tli := request.TaskListInfo
var err error
taskListToUpdate := &nosqlplugin.TaskListRow{
DomainID: tli.DomainID,
TaskListName: tli.Name,
TaskListType: tli.TaskType,
RangeID: tli.RangeID,
TaskListKind: tli.Kind,
AckLevel: tli.AckLevel,
LastUpdatedTime: time.Now(),
}
storeShard, err := t.GetStoreShardByTaskList(tli.DomainID, tli.Name, tli.TaskType)
if err != nil {
return nil, err
}
if tli.Kind == persistence.TaskListKindSticky { // if task_list is sticky, then update with TTL
err = storeShard.db.UpdateTaskListWithTTL(ctx, stickyTaskListTTL, taskListToUpdate, tli.RangeID)
} else {
err = storeShard.db.UpdateTaskList(ctx, taskListToUpdate, tli.RangeID)
}
if err != nil {
conditionFailure, ok := err.(*nosqlplugin.TaskOperationConditionFailure)
if ok {
return nil, &persistence.ConditionFailedError{
Msg: fmt.Sprintf("Failed to update task list. name: %v, type: %v, rangeID: %v, columns: (%v)",
tli.Name, tli.TaskType, tli.RangeID, conditionFailure.Details),
}
}
return nil, convertCommonErrors(storeShard.db, "UpdateTaskList", err)
}
return &persistence.UpdateTaskListResponse{}, nil
}
func (t *nosqlTaskStore) ListTaskList(
_ context.Context,
_ *persistence.ListTaskListRequest,
) (*persistence.ListTaskListResponse, error) {
return nil, &types.InternalServiceError{
Message: "unsupported operation",
}
}
func (t *nosqlTaskStore) DeleteTaskList(
ctx context.Context,
request *persistence.DeleteTaskListRequest,
) error {
storeShard, err := t.GetStoreShardByTaskList(request.DomainID, request.TaskListName, request.TaskListType)
if err != nil {
return err
}
err = storeShard.db.DeleteTaskList(ctx, &nosqlplugin.TaskListFilter{
DomainID: request.DomainID,
TaskListName: request.TaskListName,
TaskListType: request.TaskListType,
}, request.RangeID)
if err != nil {
conditionFailure, ok := err.(*nosqlplugin.TaskOperationConditionFailure)
if ok {
return &persistence.ConditionFailedError{
Msg: fmt.Sprintf("Failed to delete task list. name: %v, type: %v, rangeID: %v, columns: (%v)",
request.TaskListName, request.TaskListType, request.RangeID, conditionFailure.Details),
}
}
return convertCommonErrors(storeShard.db, "DeleteTaskList", err)
}
return nil
}
func (t *nosqlTaskStore) CreateTasks(
ctx context.Context,
request *persistence.InternalCreateTasksRequest,
) (*persistence.CreateTasksResponse, error) {
now := time.Now()
var tasks []*nosqlplugin.TaskRowForInsert
for _, t := range request.Tasks {
task := &nosqlplugin.TaskRow{
DomainID: request.TaskListInfo.DomainID,
TaskListName: request.TaskListInfo.Name,
TaskListType: request.TaskListInfo.TaskType,
TaskID: t.TaskID,
WorkflowID: t.Execution.GetWorkflowID(),
RunID: t.Execution.GetRunID(),
ScheduledID: t.Data.ScheduleID,
CreatedTime: now,
PartitionConfig: t.Data.PartitionConfig,
}
ttl := int(t.Data.ScheduleToStartTimeout.Seconds())
tasks = append(tasks, &nosqlplugin.TaskRowForInsert{
TaskRow: *task,
TTLSeconds: ttl,
})
}
tasklistCondition := &nosqlplugin.TaskListRow{
DomainID: request.TaskListInfo.DomainID,
TaskListName: request.TaskListInfo.Name,
TaskListType: request.TaskListInfo.TaskType,
RangeID: request.TaskListInfo.RangeID,
}
tli := request.TaskListInfo
storeShard, err := t.GetStoreShardByTaskList(tli.DomainID, tli.Name, tli.TaskType)
if err != nil {
return nil, err
}
err = storeShard.db.InsertTasks(ctx, tasks, tasklistCondition)
if err != nil {
conditionFailure, ok := err.(*nosqlplugin.TaskOperationConditionFailure)
if ok {
return nil, &persistence.ConditionFailedError{
Msg: fmt.Sprintf("Failed to insert tasks. name: %v, type: %v, rangeID: %v, columns: (%v)",
request.TaskListInfo.Name, request.TaskListInfo.TaskType, request.TaskListInfo.RangeID, conditionFailure.Details),
}
}
return nil, convertCommonErrors(storeShard.db, "CreateTasks", err)
}
return &persistence.CreateTasksResponse{}, nil
}
func (t *nosqlTaskStore) GetTasks(
ctx context.Context,
request *persistence.GetTasksRequest,
) (*persistence.InternalGetTasksResponse, error) {
if request.MaxReadLevel == nil {
request.MaxReadLevel = common.Int64Ptr(math.MaxInt64)
}
if request.ReadLevel > *request.MaxReadLevel {
return &persistence.InternalGetTasksResponse{}, nil
}
storeShard, err := t.GetStoreShardByTaskList(request.DomainID, request.TaskList, request.TaskType)
if err != nil {
return nil, err
}
resp, err := storeShard.db.SelectTasks(ctx, &nosqlplugin.TasksFilter{
TaskListFilter: nosqlplugin.TaskListFilter{
DomainID: request.DomainID,
TaskListName: request.TaskList,
TaskListType: request.TaskType,
},
BatchSize: request.BatchSize,
MinTaskID: request.ReadLevel,
MaxTaskID: *request.MaxReadLevel,
})
if err != nil {
return nil, convertCommonErrors(storeShard.db, "GetTasks", err)
}
response := &persistence.InternalGetTasksResponse{}
for _, t := range resp {
response.Tasks = append(response.Tasks, toTaskInfo(t))
}
return response, nil
}
func toTaskInfo(t *nosqlplugin.TaskRow) *persistence.InternalTaskInfo {
return &persistence.InternalTaskInfo{
DomainID: t.DomainID,
WorkflowID: t.WorkflowID,
RunID: t.RunID,
TaskID: t.TaskID,
ScheduleID: t.ScheduledID,
CreatedTime: t.CreatedTime,
PartitionConfig: t.PartitionConfig,
}
}
func (t *nosqlTaskStore) CompleteTask(
ctx context.Context,
request *persistence.CompleteTaskRequest,
) error {
tli := request.TaskList
storeShard, err := t.GetStoreShardByTaskList(tli.DomainID, tli.Name, tli.TaskType)
if err != nil {
return err
}
_, err = storeShard.db.RangeDeleteTasks(ctx, &nosqlplugin.TasksFilter{
TaskListFilter: nosqlplugin.TaskListFilter{
DomainID: tli.DomainID,
TaskListName: tli.Name,
TaskListType: request.TaskList.TaskType,
},
// exclusive
MinTaskID: request.TaskID - 1,
// inclusive
MaxTaskID: request.TaskID,
BatchSize: 1,
})
if err != nil {
return convertCommonErrors(storeShard.db, "CompleteTask", err)
}
return nil
}
// CompleteTasksLessThan deletes all tasks less than or equal to the given task id. This API ignores the
// Limit request parameter i.e. either all tasks leq the task_id will be deleted or an error will
// be returned to the caller
func (t *nosqlTaskStore) CompleteTasksLessThan(
ctx context.Context,
request *persistence.CompleteTasksLessThanRequest,
) (*persistence.CompleteTasksLessThanResponse, error) {
storeShard, err := t.GetStoreShardByTaskList(request.DomainID, request.TaskListName, request.TaskType)
if err != nil {
return nil, err
}
num, err := storeShard.db.RangeDeleteTasks(ctx, &nosqlplugin.TasksFilter{
TaskListFilter: nosqlplugin.TaskListFilter{
DomainID: request.DomainID,
TaskListName: request.TaskListName,
TaskListType: request.TaskType,
},
// NOTE: MinTaskID is supported in plugin interfaces but not exposed in dataInterfaces/persistenceInterfaces
// We may want to add it so that we can test it.
// https://github.com/uber/cadence/issues/4243
MinTaskID: 0,
// NOTE: request.TaskID is also inclusive, even though the name is CompleteTasksLessThan
MaxTaskID: request.TaskID,
BatchSize: request.Limit,
})
if err != nil {
return nil, convertCommonErrors(storeShard.db, "CompleteTasksLessThan", err)
}
return &persistence.CompleteTasksLessThanResponse{TasksCompleted: num}, nil
}