common/persistence/nosql/nosql_execution_store.go (678 lines of code) (raw):
// Copyright (c) 2017-2021 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package nosql
import (
"context"
"fmt"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/persistence/nosql/nosqlplugin"
"github.com/uber/cadence/common/types"
)
// Implements ExecutionStore
type nosqlExecutionStore struct {
shardID int
nosqlStore
}
// NewExecutionStore is used to create an instance of ExecutionStore implementation
func NewExecutionStore(
shardID int,
db nosqlplugin.DB,
logger log.Logger,
) (persistence.ExecutionStore, error) {
return &nosqlExecutionStore{
nosqlStore: nosqlStore{
logger: logger,
db: db,
},
shardID: shardID,
}, nil
}
func (d *nosqlExecutionStore) GetShardID() int {
return d.shardID
}
func (d *nosqlExecutionStore) CreateWorkflowExecution(
ctx context.Context,
request *persistence.InternalCreateWorkflowExecutionRequest,
) (*persistence.CreateWorkflowExecutionResponse, error) {
newWorkflow := request.NewWorkflowSnapshot
executionInfo := newWorkflow.ExecutionInfo
lastWriteVersion := newWorkflow.LastWriteVersion
domainID := executionInfo.DomainID
workflowID := executionInfo.WorkflowID
runID := executionInfo.RunID
if err := persistence.ValidateCreateWorkflowModeState(
request.Mode,
newWorkflow,
); err != nil {
return nil, err
}
currentWorkflowWriteReq, err := d.prepareCurrentWorkflowRequestForCreateWorkflowTxn(domainID, workflowID, runID, executionInfo, lastWriteVersion, request)
if err != nil {
return nil, err
}
workflowExecutionWriteReq, err := d.prepareCreateWorkflowExecutionRequestWithMaps(&newWorkflow)
if err != nil {
return nil, err
}
transferTasks, crossClusterTasks, replicationTasks, timerTasks, err := d.prepareNoSQLTasksForWorkflowTxn(
domainID, workflowID, runID,
newWorkflow.TransferTasks, newWorkflow.CrossClusterTasks, newWorkflow.ReplicationTasks, newWorkflow.TimerTasks,
nil, nil, nil, nil,
)
if err != nil {
return nil, err
}
shardCondition := &nosqlplugin.ShardCondition{
ShardID: d.shardID,
RangeID: request.RangeID,
}
err = d.db.InsertWorkflowExecutionWithTasks(
ctx,
currentWorkflowWriteReq, workflowExecutionWriteReq,
transferTasks, crossClusterTasks, replicationTasks, timerTasks,
shardCondition,
)
if err != nil {
conditionFailureErr, isConditionFailedError := err.(*nosqlplugin.WorkflowOperationConditionFailure)
if isConditionFailedError {
switch {
case conditionFailureErr.UnknownConditionFailureDetails != nil:
return nil, &persistence.ShardOwnershipLostError{
ShardID: d.shardID,
Msg: *conditionFailureErr.UnknownConditionFailureDetails,
}
case conditionFailureErr.ShardRangeIDNotMatch != nil:
return nil, &persistence.ShardOwnershipLostError{
ShardID: d.shardID,
Msg: fmt.Sprintf("Failed to create workflow execution. Request RangeID: %v, Actual RangeID: %v",
request.RangeID, *conditionFailureErr.ShardRangeIDNotMatch),
}
case conditionFailureErr.CurrentWorkflowConditionFailInfo != nil:
return nil, &persistence.CurrentWorkflowConditionFailedError{
Msg: *conditionFailureErr.CurrentWorkflowConditionFailInfo,
}
case conditionFailureErr.WorkflowExecutionAlreadyExists != nil:
return nil, &persistence.WorkflowExecutionAlreadyStartedError{
Msg: conditionFailureErr.WorkflowExecutionAlreadyExists.OtherInfo,
StartRequestID: conditionFailureErr.WorkflowExecutionAlreadyExists.CreateRequestID,
RunID: conditionFailureErr.WorkflowExecutionAlreadyExists.RunID,
State: conditionFailureErr.WorkflowExecutionAlreadyExists.State,
CloseStatus: conditionFailureErr.WorkflowExecutionAlreadyExists.CloseStatus,
LastWriteVersion: conditionFailureErr.WorkflowExecutionAlreadyExists.LastWriteVersion,
}
default:
// If ever runs into this branch, there is bug in the code either in here, or in the implementation of nosql plugin
err := fmt.Errorf("unsupported conditionFailureReason error")
d.logger.Error("A code bug exists in persistence layer, please investigate ASAP", tag.Error(err))
return nil, err
}
}
return nil, convertCommonErrors(d.db, "CreateWorkflowExecution", err)
}
return &persistence.CreateWorkflowExecutionResponse{}, nil
}
func (d *nosqlExecutionStore) GetWorkflowExecution(
ctx context.Context,
request *persistence.InternalGetWorkflowExecutionRequest,
) (*persistence.InternalGetWorkflowExecutionResponse, error) {
execution := request.Execution
state, err := d.db.SelectWorkflowExecution(ctx, d.shardID, request.DomainID, execution.WorkflowID, execution.RunID)
if err != nil {
if d.db.IsNotFoundError(err) {
return nil, &types.EntityNotExistsError{
Message: fmt.Sprintf("Workflow execution not found. WorkflowId: %v, RunId: %v",
execution.WorkflowID, execution.RunID),
}
}
return nil, convertCommonErrors(d.db, "GetWorkflowExecution", err)
}
return &persistence.InternalGetWorkflowExecutionResponse{State: state}, nil
}
func (d *nosqlExecutionStore) UpdateWorkflowExecution(
ctx context.Context,
request *persistence.InternalUpdateWorkflowExecutionRequest,
) error {
updateWorkflow := request.UpdateWorkflowMutation
newWorkflow := request.NewWorkflowSnapshot
executionInfo := updateWorkflow.ExecutionInfo
domainID := executionInfo.DomainID
workflowID := executionInfo.WorkflowID
runID := executionInfo.RunID
if err := persistence.ValidateUpdateWorkflowModeState(
request.Mode,
updateWorkflow,
newWorkflow,
); err != nil {
return err
}
var currentWorkflowWriteReq *nosqlplugin.CurrentWorkflowWriteRequest
switch request.Mode {
case persistence.UpdateWorkflowModeIgnoreCurrent:
currentWorkflowWriteReq = &nosqlplugin.CurrentWorkflowWriteRequest{
WriteMode: nosqlplugin.CurrentWorkflowWriteModeNoop,
}
case persistence.UpdateWorkflowModeBypassCurrent:
if err := d.assertNotCurrentExecution(
ctx,
domainID,
workflowID,
runID); err != nil {
return err
}
currentWorkflowWriteReq = &nosqlplugin.CurrentWorkflowWriteRequest{
WriteMode: nosqlplugin.CurrentWorkflowWriteModeNoop,
}
case persistence.UpdateWorkflowModeUpdateCurrent:
if newWorkflow != nil {
newExecutionInfo := newWorkflow.ExecutionInfo
newLastWriteVersion := newWorkflow.LastWriteVersion
newDomainID := newExecutionInfo.DomainID
// TODO: ?? would it change at all ??
newWorkflowID := newExecutionInfo.WorkflowID
newRunID := newExecutionInfo.RunID
if domainID != newDomainID {
return &types.InternalServiceError{
Message: "UpdateWorkflowExecution: cannot continue as new to another domain",
}
}
currentWorkflowWriteReq = &nosqlplugin.CurrentWorkflowWriteRequest{
WriteMode: nosqlplugin.CurrentWorkflowWriteModeUpdate,
Row: nosqlplugin.CurrentWorkflowRow{
ShardID: d.shardID,
DomainID: newDomainID,
WorkflowID: newWorkflowID,
RunID: newRunID,
State: newExecutionInfo.State,
CloseStatus: newExecutionInfo.CloseStatus,
CreateRequestID: newExecutionInfo.CreateRequestID,
LastWriteVersion: newLastWriteVersion,
},
Condition: &nosqlplugin.CurrentWorkflowWriteCondition{
CurrentRunID: &runID,
},
}
} else {
lastWriteVersion := updateWorkflow.LastWriteVersion
currentWorkflowWriteReq = &nosqlplugin.CurrentWorkflowWriteRequest{
WriteMode: nosqlplugin.CurrentWorkflowWriteModeUpdate,
Row: nosqlplugin.CurrentWorkflowRow{
ShardID: d.shardID,
DomainID: domainID,
WorkflowID: workflowID,
RunID: runID,
State: executionInfo.State,
CloseStatus: executionInfo.CloseStatus,
CreateRequestID: executionInfo.CreateRequestID,
LastWriteVersion: lastWriteVersion,
},
Condition: &nosqlplugin.CurrentWorkflowWriteCondition{
CurrentRunID: &runID,
},
}
}
default:
return &types.InternalServiceError{
Message: fmt.Sprintf("UpdateWorkflowExecution: unknown mode: %v", request.Mode),
}
}
var mutateExecution, insertExecution *nosqlplugin.WorkflowExecutionRequest
var nosqlTransferTasks []*nosqlplugin.TransferTask
var nosqlCrossClusterTasks []*nosqlplugin.CrossClusterTask
var nosqlReplicationTasks []*nosqlplugin.ReplicationTask
var nosqlTimerTasks []*nosqlplugin.TimerTask
var err error
// 1. current
mutateExecution, err = d.prepareUpdateWorkflowExecutionRequestWithMapsAndEventBuffer(&updateWorkflow)
if err != nil {
return err
}
nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks, err = d.prepareNoSQLTasksForWorkflowTxn(
domainID, workflowID, updateWorkflow.ExecutionInfo.RunID,
updateWorkflow.TransferTasks, updateWorkflow.CrossClusterTasks, updateWorkflow.ReplicationTasks, updateWorkflow.TimerTasks,
nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks,
)
if err != nil {
return err
}
// 2. new
if newWorkflow != nil {
insertExecution, err = d.prepareCreateWorkflowExecutionRequestWithMaps(newWorkflow)
if err != nil {
return err
}
nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks, err = d.prepareNoSQLTasksForWorkflowTxn(
domainID, workflowID, newWorkflow.ExecutionInfo.RunID,
newWorkflow.TransferTasks, newWorkflow.CrossClusterTasks, newWorkflow.ReplicationTasks, newWorkflow.TimerTasks,
nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks,
)
if err != nil {
return err
}
}
shardCondition := &nosqlplugin.ShardCondition{
ShardID: d.shardID,
RangeID: request.RangeID,
}
err = d.db.UpdateWorkflowExecutionWithTasks(
ctx, currentWorkflowWriteReq,
mutateExecution, insertExecution, nil, // no workflow to reset here
nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks,
shardCondition)
return d.processUpdateWorkflowResult(err, request.RangeID)
}
func (d *nosqlExecutionStore) ConflictResolveWorkflowExecution(
ctx context.Context,
request *persistence.InternalConflictResolveWorkflowExecutionRequest,
) error {
currentWorkflow := request.CurrentWorkflowMutation
resetWorkflow := request.ResetWorkflowSnapshot
newWorkflow := request.NewWorkflowSnapshot
domainID := resetWorkflow.ExecutionInfo.DomainID
workflowID := resetWorkflow.ExecutionInfo.WorkflowID
if err := persistence.ValidateConflictResolveWorkflowModeState(
request.Mode,
resetWorkflow,
newWorkflow,
currentWorkflow,
); err != nil {
return err
}
var currentWorkflowWriteReq *nosqlplugin.CurrentWorkflowWriteRequest
var prevRunID string
switch request.Mode {
case persistence.ConflictResolveWorkflowModeBypassCurrent:
if err := d.assertNotCurrentExecution(
ctx,
domainID,
workflowID,
resetWorkflow.ExecutionInfo.RunID); err != nil {
return err
}
currentWorkflowWriteReq = &nosqlplugin.CurrentWorkflowWriteRequest{
WriteMode: nosqlplugin.CurrentWorkflowWriteModeNoop,
}
case persistence.ConflictResolveWorkflowModeUpdateCurrent:
executionInfo := resetWorkflow.ExecutionInfo
lastWriteVersion := resetWorkflow.LastWriteVersion
if newWorkflow != nil {
executionInfo = newWorkflow.ExecutionInfo
lastWriteVersion = newWorkflow.LastWriteVersion
}
if currentWorkflow != nil {
prevRunID = currentWorkflow.ExecutionInfo.RunID
} else {
// reset workflow is current
prevRunID = resetWorkflow.ExecutionInfo.RunID
}
currentWorkflowWriteReq = &nosqlplugin.CurrentWorkflowWriteRequest{
WriteMode: nosqlplugin.CurrentWorkflowWriteModeUpdate,
Row: nosqlplugin.CurrentWorkflowRow{
ShardID: d.shardID,
DomainID: domainID,
WorkflowID: workflowID,
RunID: executionInfo.RunID,
State: executionInfo.State,
CloseStatus: executionInfo.CloseStatus,
CreateRequestID: executionInfo.CreateRequestID,
LastWriteVersion: lastWriteVersion,
},
Condition: &nosqlplugin.CurrentWorkflowWriteCondition{
CurrentRunID: &prevRunID,
},
}
default:
return &types.InternalServiceError{
Message: fmt.Sprintf("ConflictResolveWorkflowExecution: unknown mode: %v", request.Mode),
}
}
var mutateExecution, insertExecution, resetExecution *nosqlplugin.WorkflowExecutionRequest
var nosqlTransferTasks []*nosqlplugin.TransferTask
var nosqlCrossClusterTasks []*nosqlplugin.CrossClusterTask
var nosqlReplicationTasks []*nosqlplugin.ReplicationTask
var nosqlTimerTasks []*nosqlplugin.TimerTask
var err error
// 1. current
if currentWorkflow != nil {
mutateExecution, err = d.prepareUpdateWorkflowExecutionRequestWithMapsAndEventBuffer(currentWorkflow)
if err != nil {
return err
}
nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks, err = d.prepareNoSQLTasksForWorkflowTxn(
domainID, workflowID, currentWorkflow.ExecutionInfo.RunID,
currentWorkflow.TransferTasks, currentWorkflow.CrossClusterTasks, currentWorkflow.ReplicationTasks, currentWorkflow.TimerTasks,
nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks,
)
if err != nil {
return err
}
}
// 2. reset
resetExecution, err = d.prepareResetWorkflowExecutionRequestWithMapsAndEventBuffer(&resetWorkflow)
if err != nil {
return err
}
nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks, err = d.prepareNoSQLTasksForWorkflowTxn(
domainID, workflowID, resetWorkflow.ExecutionInfo.RunID,
resetWorkflow.TransferTasks, resetWorkflow.CrossClusterTasks, resetWorkflow.ReplicationTasks, resetWorkflow.TimerTasks,
nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks,
)
if err != nil {
return err
}
// 3. new
if newWorkflow != nil {
insertExecution, err = d.prepareCreateWorkflowExecutionRequestWithMaps(newWorkflow)
if err != nil {
return err
}
nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks, err = d.prepareNoSQLTasksForWorkflowTxn(
domainID, workflowID, newWorkflow.ExecutionInfo.RunID,
newWorkflow.TransferTasks, newWorkflow.CrossClusterTasks, newWorkflow.ReplicationTasks, newWorkflow.TimerTasks,
nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks,
)
if err != nil {
return err
}
}
shardCondition := &nosqlplugin.ShardCondition{
ShardID: d.shardID,
RangeID: request.RangeID,
}
err = d.db.UpdateWorkflowExecutionWithTasks(
ctx, currentWorkflowWriteReq,
mutateExecution, insertExecution, resetExecution,
nosqlTransferTasks, nosqlCrossClusterTasks, nosqlReplicationTasks, nosqlTimerTasks,
shardCondition)
return d.processUpdateWorkflowResult(err, request.RangeID)
}
func (d *nosqlExecutionStore) DeleteWorkflowExecution(
ctx context.Context,
request *persistence.DeleteWorkflowExecutionRequest,
) error {
err := d.db.DeleteWorkflowExecution(ctx, d.shardID, request.DomainID, request.WorkflowID, request.RunID)
if err != nil {
return convertCommonErrors(d.db, "DeleteWorkflowExecution", err)
}
return nil
}
func (d *nosqlExecutionStore) DeleteCurrentWorkflowExecution(
ctx context.Context,
request *persistence.DeleteCurrentWorkflowExecutionRequest,
) error {
err := d.db.DeleteCurrentWorkflow(ctx, d.shardID, request.DomainID, request.WorkflowID, request.RunID)
if err != nil {
return convertCommonErrors(d.db, "DeleteCurrentWorkflowExecution", err)
}
return nil
}
func (d *nosqlExecutionStore) GetCurrentExecution(
ctx context.Context,
request *persistence.GetCurrentExecutionRequest,
) (*persistence.GetCurrentExecutionResponse,
error) {
result, err := d.db.SelectCurrentWorkflow(ctx, d.shardID, request.DomainID, request.WorkflowID)
if err != nil {
if d.db.IsNotFoundError(err) {
return nil, &types.EntityNotExistsError{
Message: fmt.Sprintf("Workflow execution not found. WorkflowId: %v",
request.WorkflowID),
}
}
return nil, convertCommonErrors(d.db, "GetCurrentExecution", err)
}
return &persistence.GetCurrentExecutionResponse{
RunID: result.RunID,
StartRequestID: result.CreateRequestID,
State: result.State,
CloseStatus: result.CloseStatus,
LastWriteVersion: result.LastWriteVersion,
}, nil
}
func (d *nosqlExecutionStore) ListCurrentExecutions(
ctx context.Context,
request *persistence.ListCurrentExecutionsRequest,
) (*persistence.ListCurrentExecutionsResponse, error) {
executions, token, err := d.db.SelectAllCurrentWorkflows(ctx, d.shardID, request.PageToken, request.PageSize)
if err != nil {
return nil, convertCommonErrors(d.db, "ListCurrentExecutions", err)
}
return &persistence.ListCurrentExecutionsResponse{
Executions: executions,
PageToken: token,
}, nil
}
func (d *nosqlExecutionStore) IsWorkflowExecutionExists(
ctx context.Context,
request *persistence.IsWorkflowExecutionExistsRequest,
) (*persistence.IsWorkflowExecutionExistsResponse, error) {
exists, err := d.db.IsWorkflowExecutionExists(ctx, d.shardID, request.DomainID, request.WorkflowID, request.RunID)
if err != nil {
return nil, convertCommonErrors(d.db, "IsWorkflowExecutionExists", err)
}
return &persistence.IsWorkflowExecutionExistsResponse{
Exists: exists,
}, nil
}
func (d *nosqlExecutionStore) ListConcreteExecutions(
ctx context.Context,
request *persistence.ListConcreteExecutionsRequest,
) (*persistence.InternalListConcreteExecutionsResponse, error) {
executions, nextPageToken, err := d.db.SelectAllWorkflowExecutions(ctx, d.shardID, request.PageToken, request.PageSize)
if err != nil {
return nil, convertCommonErrors(d.db, "ListConcreteExecutions", err)
}
return &persistence.InternalListConcreteExecutionsResponse{
Executions: executions,
NextPageToken: nextPageToken,
}, nil
}
func (d *nosqlExecutionStore) GetTransferTasks(
ctx context.Context,
request *persistence.GetTransferTasksRequest,
) (*persistence.GetTransferTasksResponse, error) {
tasks, nextPageToken, err := d.db.SelectTransferTasksOrderByTaskID(ctx, d.shardID, request.BatchSize, request.NextPageToken, request.ReadLevel, request.MaxReadLevel)
if err != nil {
return nil, convertCommonErrors(d.db, "GetTransferTasks", err)
}
return &persistence.GetTransferTasksResponse{
Tasks: tasks,
NextPageToken: nextPageToken,
}, nil
}
func (d *nosqlExecutionStore) GetCrossClusterTasks(
ctx context.Context,
request *persistence.GetCrossClusterTasksRequest,
) (*persistence.GetCrossClusterTasksResponse, error) {
cTasks, nextPageToken, err := d.db.SelectCrossClusterTasksOrderByTaskID(ctx, d.shardID, request.BatchSize, request.NextPageToken, request.TargetCluster, request.ReadLevel, request.MaxReadLevel)
if err != nil {
return nil, convertCommonErrors(d.db, "GetCrossClusterTasks", err)
}
var tTasks []*persistence.CrossClusterTaskInfo
for _, t := range cTasks {
// revive:disable-next-line:range-val-address Appending address of TransferTask, not of t.
tTasks = append(tTasks, &t.TransferTask)
}
return &persistence.GetCrossClusterTasksResponse{
Tasks: tTasks,
NextPageToken: nextPageToken,
}, nil
}
func (d *nosqlExecutionStore) GetReplicationTasks(
ctx context.Context,
request *persistence.GetReplicationTasksRequest,
) (*persistence.InternalGetReplicationTasksResponse, error) {
tasks, nextPageToken, err := d.db.SelectReplicationTasksOrderByTaskID(ctx, d.shardID, request.BatchSize, request.NextPageToken, request.ReadLevel, request.MaxReadLevel)
if err != nil {
return nil, convertCommonErrors(d.db, "GetReplicationTasks", err)
}
return &persistence.InternalGetReplicationTasksResponse{
Tasks: tasks,
NextPageToken: nextPageToken,
}, nil
}
func (d *nosqlExecutionStore) CompleteTransferTask(
ctx context.Context,
request *persistence.CompleteTransferTaskRequest,
) error {
err := d.db.DeleteTransferTask(ctx, d.shardID, request.TaskID)
if err != nil {
return convertCommonErrors(d.db, "CompleteTransferTask", err)
}
return nil
}
func (d *nosqlExecutionStore) RangeCompleteTransferTask(
ctx context.Context,
request *persistence.RangeCompleteTransferTaskRequest,
) (*persistence.RangeCompleteTransferTaskResponse, error) {
err := d.db.RangeDeleteTransferTasks(ctx, d.shardID, request.ExclusiveBeginTaskID, request.InclusiveEndTaskID)
if err != nil {
return nil, convertCommonErrors(d.db, "RangeCompleteTransferTask", err)
}
return &persistence.RangeCompleteTransferTaskResponse{TasksCompleted: persistence.UnknownNumRowsAffected}, nil
}
func (d *nosqlExecutionStore) CompleteCrossClusterTask(
ctx context.Context,
request *persistence.CompleteCrossClusterTaskRequest,
) error {
err := d.db.DeleteCrossClusterTask(ctx, d.shardID, request.TargetCluster, request.TaskID)
if err != nil {
return convertCommonErrors(d.db, "CompleteCrossClusterTask", err)
}
return nil
}
func (d *nosqlExecutionStore) RangeCompleteCrossClusterTask(
ctx context.Context,
request *persistence.RangeCompleteCrossClusterTaskRequest,
) (*persistence.RangeCompleteCrossClusterTaskResponse, error) {
err := d.db.RangeDeleteCrossClusterTasks(ctx, d.shardID, request.TargetCluster, request.ExclusiveBeginTaskID, request.InclusiveEndTaskID)
if err != nil {
return nil, convertCommonErrors(d.db, "RangeCompleteCrossClusterTask", err)
}
return &persistence.RangeCompleteCrossClusterTaskResponse{TasksCompleted: persistence.UnknownNumRowsAffected}, nil
}
func (d *nosqlExecutionStore) CompleteReplicationTask(
ctx context.Context,
request *persistence.CompleteReplicationTaskRequest,
) error {
err := d.db.DeleteReplicationTask(ctx, d.shardID, request.TaskID)
if err != nil {
return convertCommonErrors(d.db, "CompleteReplicationTask", err)
}
return nil
}
func (d *nosqlExecutionStore) RangeCompleteReplicationTask(
ctx context.Context,
request *persistence.RangeCompleteReplicationTaskRequest,
) (*persistence.RangeCompleteReplicationTaskResponse, error) {
err := d.db.RangeDeleteReplicationTasks(ctx, d.shardID, request.InclusiveEndTaskID)
if err != nil {
return nil, convertCommonErrors(d.db, "RangeCompleteReplicationTask", err)
}
return &persistence.RangeCompleteReplicationTaskResponse{TasksCompleted: persistence.UnknownNumRowsAffected}, nil
}
func (d *nosqlExecutionStore) CompleteTimerTask(
ctx context.Context,
request *persistence.CompleteTimerTaskRequest,
) error {
err := d.db.DeleteTimerTask(ctx, d.shardID, request.TaskID, request.VisibilityTimestamp)
if err != nil {
return convertCommonErrors(d.db, "CompleteTimerTask", err)
}
return nil
}
func (d *nosqlExecutionStore) RangeCompleteTimerTask(
ctx context.Context,
request *persistence.RangeCompleteTimerTaskRequest,
) (*persistence.RangeCompleteTimerTaskResponse, error) {
err := d.db.RangeDeleteTimerTasks(ctx, d.shardID, request.InclusiveBeginTimestamp, request.ExclusiveEndTimestamp)
if err != nil {
return nil, convertCommonErrors(d.db, "RangeCompleteTimerTask", err)
}
return &persistence.RangeCompleteTimerTaskResponse{TasksCompleted: persistence.UnknownNumRowsAffected}, nil
}
func (d *nosqlExecutionStore) GetTimerIndexTasks(
ctx context.Context,
request *persistence.GetTimerIndexTasksRequest,
) (*persistence.GetTimerIndexTasksResponse, error) {
timers, nextPageToken, err := d.db.SelectTimerTasksOrderByVisibilityTime(ctx, d.shardID, request.BatchSize, request.NextPageToken, request.MinTimestamp, request.MaxTimestamp)
if err != nil {
return nil, convertCommonErrors(d.db, "GetTimerTasks", err)
}
return &persistence.GetTimerIndexTasksResponse{
Timers: timers,
NextPageToken: nextPageToken,
}, nil
}
func (d *nosqlExecutionStore) PutReplicationTaskToDLQ(
ctx context.Context,
request *persistence.InternalPutReplicationTaskToDLQRequest,
) error {
err := d.db.InsertReplicationDLQTask(ctx, d.shardID, request.SourceClusterName, *request.TaskInfo)
if err != nil {
return convertCommonErrors(d.db, "PutReplicationTaskToDLQ", err)
}
return nil
}
func (d *nosqlExecutionStore) GetReplicationTasksFromDLQ(
ctx context.Context,
request *persistence.GetReplicationTasksFromDLQRequest,
) (*persistence.InternalGetReplicationTasksFromDLQResponse, error) {
tasks, nextPageToken, err := d.db.SelectReplicationDLQTasksOrderByTaskID(ctx, d.shardID, request.SourceClusterName, request.BatchSize, request.NextPageToken, request.ReadLevel, request.MaxReadLevel)
if err != nil {
return nil, convertCommonErrors(d.db, "GetReplicationTasksFromDLQ", err)
}
return &persistence.InternalGetReplicationTasksResponse{
Tasks: tasks,
NextPageToken: nextPageToken,
}, nil
}
func (d *nosqlExecutionStore) GetReplicationDLQSize(
ctx context.Context,
request *persistence.GetReplicationDLQSizeRequest,
) (*persistence.GetReplicationDLQSizeResponse, error) {
size, err := d.db.SelectReplicationDLQTasksCount(ctx, d.shardID, request.SourceClusterName)
if err != nil {
return nil, convertCommonErrors(d.db, "GetReplicationDLQSize", err)
}
return &persistence.GetReplicationDLQSizeResponse{
Size: size,
}, nil
}
func (d *nosqlExecutionStore) DeleteReplicationTaskFromDLQ(
ctx context.Context,
request *persistence.DeleteReplicationTaskFromDLQRequest,
) error {
err := d.db.DeleteReplicationDLQTask(ctx, d.shardID, request.SourceClusterName, request.TaskID)
if err != nil {
return convertCommonErrors(d.db, "DeleteReplicationTaskFromDLQ", err)
}
return nil
}
func (d *nosqlExecutionStore) RangeDeleteReplicationTaskFromDLQ(
ctx context.Context,
request *persistence.RangeDeleteReplicationTaskFromDLQRequest,
) (*persistence.RangeDeleteReplicationTaskFromDLQResponse, error) {
err := d.db.RangeDeleteReplicationDLQTasks(ctx, d.shardID, request.SourceClusterName, request.ExclusiveBeginTaskID, request.InclusiveEndTaskID)
if err != nil {
return nil, convertCommonErrors(d.db, "RangeDeleteReplicationTaskFromDLQ", err)
}
return &persistence.RangeDeleteReplicationTaskFromDLQResponse{TasksCompleted: persistence.UnknownNumRowsAffected}, nil
}
func (d *nosqlExecutionStore) CreateFailoverMarkerTasks(
ctx context.Context,
request *persistence.CreateFailoverMarkersRequest,
) error {
var nosqlTasks []*nosqlplugin.ReplicationTask
for _, task := range request.Markers {
ts := []persistence.Task{task}
tasks, err := d.prepareReplicationTasksForWorkflowTxn(task.DomainID, rowTypeReplicationWorkflowID, rowTypeReplicationRunID, ts)
if err != nil {
return err
}
nosqlTasks = append(nosqlTasks, tasks...)
}
err := d.db.InsertReplicationTask(ctx, nosqlTasks, nosqlplugin.ShardCondition{
ShardID: d.shardID,
RangeID: request.RangeID,
})
if err != nil {
conditionFailureErr, isConditionFailedError := err.(*nosqlplugin.ShardOperationConditionFailure)
if isConditionFailedError {
return &persistence.ShardOwnershipLostError{
ShardID: d.shardID,
Msg: fmt.Sprintf("Failed to create workflow execution. Request RangeID: %v, columns: (%v)",
conditionFailureErr.RangeID, conditionFailureErr.Details),
}
}
}
return nil
}