common/persistence/nosql/nosqlplugin/cassandra/tasks.go (318 lines of code) (raw):
// Copyright (c) 2021 Uber Technologies, Inc.
// Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package cassandra
import (
"context"
"fmt"
"strings"
"time"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/persistence/nosql/nosqlplugin"
"github.com/uber/cadence/common/persistence/nosql/nosqlplugin/cassandra/gocql"
"github.com/uber/cadence/common/types"
)
const (
// Row types for table tasks
rowTypeTask = iota
rowTypeTaskList
)
const (
taskListTaskID = -12345
initialRangeID = 1 // Id of the first range of a new task list
)
// SelectTaskList returns a single tasklist row.
// Return IsNotFoundError if the row doesn't exist
func (db *cdb) SelectTaskList(ctx context.Context, filter *nosqlplugin.TaskListFilter) (*nosqlplugin.TaskListRow, error) {
query := db.session.Query(templateGetTaskList,
filter.DomainID,
filter.TaskListName,
filter.TaskListType,
rowTypeTaskList,
taskListTaskID,
).WithContext(ctx)
var rangeID int64
var tlDB map[string]interface{}
err := query.Scan(&rangeID, &tlDB)
if err != nil {
return nil, err
}
ackLevel := tlDB["ack_level"].(int64)
taskListKind := tlDB["kind"].(int)
lastUpdatedTime := tlDB["last_updated"].(time.Time)
return &nosqlplugin.TaskListRow{
DomainID: filter.DomainID,
TaskListName: filter.TaskListName,
TaskListType: filter.TaskListType,
TaskListKind: taskListKind,
LastUpdatedTime: lastUpdatedTime,
AckLevel: ackLevel,
RangeID: rangeID,
}, nil
}
// InsertTaskList insert a single tasklist row
// Return TaskOperationConditionFailure if the condition doesn't meet
func (db *cdb) InsertTaskList(ctx context.Context, row *nosqlplugin.TaskListRow) error {
query := db.session.Query(templateInsertTaskListQuery,
row.DomainID,
row.TaskListName,
row.TaskListType,
rowTypeTaskList,
taskListTaskID,
initialRangeID,
row.DomainID,
row.TaskListName,
row.TaskListType,
0,
row.TaskListKind,
row.LastUpdatedTime,
).WithContext(ctx)
previous := make(map[string]interface{})
applied, err := query.MapScanCAS(previous)
if err != nil {
return err
}
return handleTaskListAppliedError(applied, previous)
}
// UpdateTaskList updates a single tasklist row
// Return TaskOperationConditionFailure if the condition doesn't meet
func (db *cdb) UpdateTaskList(
ctx context.Context,
row *nosqlplugin.TaskListRow,
previousRangeID int64,
) error {
query := db.session.Query(templateUpdateTaskListQuery,
row.RangeID,
row.DomainID,
row.TaskListName,
row.TaskListType,
row.AckLevel,
row.TaskListKind,
row.LastUpdatedTime,
row.DomainID,
row.TaskListName,
row.TaskListType,
rowTypeTaskList,
taskListTaskID,
previousRangeID,
).WithContext(ctx)
previous := make(map[string]interface{})
applied, err := query.MapScanCAS(previous)
if err != nil {
return err
}
return handleTaskListAppliedError(applied, previous)
}
func handleTaskListAppliedError(applied bool, previous map[string]interface{}) error {
if !applied {
// NOTE: Cassandra only returns the conflicted columns in this results
rangeID := previous["range_id"].(int64)
var columns []string
for k, v := range previous {
columns = append(columns, fmt.Sprintf("%s=%v", k, v))
}
return &nosqlplugin.TaskOperationConditionFailure{
RangeID: rangeID,
Details: strings.Join(columns, ","),
}
}
return nil
}
// UpdateTaskList updates a single tasklist row, and set an TTL on the record
// Return TaskOperationConditionFailure if the condition doesn't meet
// Ignore TTL if it's not supported, which becomes exactly the same as UpdateTaskList, but ListTaskList must be
// implemented for TaskListScavenger
func (db *cdb) UpdateTaskListWithTTL(
ctx context.Context,
ttlSeconds int64,
row *nosqlplugin.TaskListRow,
previousRangeID int64,
) error {
batch := db.session.NewBatch(gocql.LoggedBatch).WithContext(ctx)
// part 1 is used to set TTL on primary key as UPDATE can't set TTL for primary key
batch.Query(templateUpdateTaskListQueryWithTTLPart1,
row.DomainID,
row.TaskListName,
row.TaskListType,
rowTypeTaskList,
taskListTaskID,
ttlSeconds,
)
// part 2 is for CAS and setting TTL for the rest of the columns
batch.Query(templateUpdateTaskListQueryWithTTLPart2,
ttlSeconds,
row.RangeID,
row.DomainID,
row.TaskListName,
row.TaskListType,
row.AckLevel,
row.TaskListKind,
db.timeSrc.Now(),
row.DomainID,
row.TaskListName,
row.TaskListType,
rowTypeTaskList,
taskListTaskID,
previousRangeID,
)
previous := make(map[string]interface{})
applied, _, err := db.session.MapExecuteBatchCAS(batch, previous)
if err != nil {
return err
}
return handleTaskListAppliedError(applied, previous)
}
// ListTaskList returns all tasklists.
// Noop if TTL is already implemented in other methods
func (db *cdb) ListTaskList(ctx context.Context, pageSize int, nextPageToken []byte) (*nosqlplugin.ListTaskListResult, error) {
return nil, &types.InternalServiceError{
Message: "unsupported operation",
}
}
// DeleteTaskList deletes a single tasklist row
// Return TaskOperationConditionFailure if the condition doesn't meet
func (db *cdb) DeleteTaskList(ctx context.Context, filter *nosqlplugin.TaskListFilter, previousRangeID int64) error {
query := db.session.Query(templateDeleteTaskListQuery,
filter.DomainID,
filter.TaskListName,
filter.TaskListType,
rowTypeTaskList,
taskListTaskID,
previousRangeID,
).WithContext(ctx).Consistency(cassandraAllConslevel)
previous := make(map[string]interface{})
applied, err := query.MapScanCAS(previous)
if err != nil {
if !db.isCassandraConsistencyError(err) {
return err
}
db.logger.Warn("unable to complete the delete operation due to consistency issue", tag.Error(err))
applied, err = query.Consistency(cassandraDefaultConsLevel).MapScanCAS(previous)
if err != nil {
return err
}
}
return handleTaskListAppliedError(applied, previous)
}
// InsertTasks inserts a batch of tasks
// Return IsConditionFailedError if the condition doesn't meet, and also the previous tasklist row
func (db *cdb) InsertTasks(
ctx context.Context,
tasksToInsert []*nosqlplugin.TaskRowForInsert,
tasklistCondition *nosqlplugin.TaskListRow,
) error {
batch := db.session.NewBatch(gocql.LoggedBatch).WithContext(ctx)
domainID := tasklistCondition.DomainID
taskListName := tasklistCondition.TaskListName
taskListType := tasklistCondition.TaskListType
for _, task := range tasksToInsert {
scheduleID := task.ScheduledID
ttl := int64(task.TTLSeconds)
if ttl <= 0 {
batch.Query(templateCreateTaskQuery,
domainID,
taskListName,
taskListType,
rowTypeTask,
task.TaskID,
domainID,
task.WorkflowID,
task.RunID,
scheduleID,
task.CreatedTime,
task.PartitionConfig)
} else {
if ttl > maxCassandraTTL {
ttl = maxCassandraTTL
}
batch.Query(templateCreateTaskWithTTLQuery,
domainID,
taskListName,
taskListType,
rowTypeTask,
task.TaskID,
domainID,
task.WorkflowID,
task.RunID,
scheduleID,
task.CreatedTime,
task.PartitionConfig,
ttl)
}
}
// The following query is used to ensure that range_id didn't change
batch.Query(templateUpdateTaskListRangeIDQuery,
tasklistCondition.RangeID,
domainID,
taskListName,
taskListType,
rowTypeTaskList,
taskListTaskID,
tasklistCondition.RangeID,
)
previous := make(map[string]interface{})
applied, _, err := db.session.MapExecuteBatchCAS(batch, previous)
if err != nil {
return err
}
return handleTaskListAppliedError(applied, previous)
}
// GetTasksCount returns number of tasks from a tasklist
func (db *cdb) GetTasksCount(ctx context.Context, filter *nosqlplugin.TasksFilter) (int64, error) {
query := db.session.Query(templateGetTasksCountQuery,
filter.DomainID,
filter.TaskListName,
filter.TaskListType,
rowTypeTask,
filter.MinTaskID,
).WithContext(ctx)
result := make(map[string]interface{})
if err := query.MapScan(result); err != nil {
return 0, err
}
queueSize := result["count"].(int64)
return queueSize, nil
}
// SelectTasks return tasks that associated to a tasklist
func (db *cdb) SelectTasks(ctx context.Context, filter *nosqlplugin.TasksFilter) ([]*nosqlplugin.TaskRow, error) {
// Reading tasklist tasks need to be quorum level consistent, otherwise we could loose task
query := db.session.Query(templateGetTasksQuery,
filter.DomainID,
filter.TaskListName,
filter.TaskListType,
rowTypeTask,
filter.MinTaskID,
filter.MaxTaskID,
).PageSize(filter.BatchSize).WithContext(ctx)
iter := query.Iter()
if iter == nil {
return nil, fmt.Errorf("selectTasks operation failed. Not able to create query iterator")
}
var response []*nosqlplugin.TaskRow
task := make(map[string]interface{})
PopulateTasks:
for iter.MapScan(task) {
taskID, ok := task["task_id"]
if !ok { // no tasks, but static column record returned
continue
}
t := createTaskInfo(task["task"].(map[string]interface{}))
t.TaskID = taskID.(int64)
response = append(response, t)
if len(response) == filter.BatchSize {
break PopulateTasks
}
task = make(map[string]interface{}) // Reinitialize map as initialized fails on unmarshalling
}
if err := iter.Close(); err != nil {
return nil, err
}
return response, nil
}
func createTaskInfo(
result map[string]interface{},
) *nosqlplugin.TaskRow {
info := &nosqlplugin.TaskRow{}
for k, v := range result {
switch k {
case "domain_id":
info.DomainID = v.(gocql.UUID).String()
case "workflow_id":
info.WorkflowID = v.(string)
case "run_id":
info.RunID = v.(gocql.UUID).String()
case "schedule_id":
info.ScheduledID = v.(int64)
case "created_time":
info.CreatedTime = v.(time.Time)
case "partition_config":
info.PartitionConfig = v.(map[string]string)
}
}
return info
}
// DeleteTask delete a batch tasks that taskIDs less than the row
// If TTL is not implemented, then should also return the number of rows deleted, otherwise persistence.UnknownNumRowsAffected
// NOTE: This API ignores the `BatchSize` request parameter i.e. either all tasks leq the task_id will be deleted or an error will
// be returned to the caller, because rowsDeleted is not supported by Cassandra
func (db *cdb) RangeDeleteTasks(ctx context.Context, filter *nosqlplugin.TasksFilter) (rowsDeleted int, err error) {
query := db.session.Query(templateCompleteTasksLessThanQuery,
filter.DomainID,
filter.TaskListName,
filter.TaskListType,
rowTypeTask,
filter.MinTaskID,
filter.MaxTaskID,
).WithContext(ctx)
return persistence.UnknownNumRowsAffected, db.executeWithConsistencyAll(query)
}