common/persistence/sql/workflow_state_maps.go (611 lines of code) (raw):
// Copyright (c) 2017-2020 Uber Technologies, Inc.
// Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package sql
import (
"context"
"database/sql"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/persistence/serialization"
"github.com/uber/cadence/common/persistence/sql/sqlplugin"
"github.com/uber/cadence/common/types"
)
func updateActivityInfos(
ctx context.Context,
tx sqlplugin.Tx,
activityInfos []*persistence.InternalActivityInfo,
deleteInfos []int64,
shardID int,
domainID serialization.UUID,
workflowID string,
runID serialization.UUID,
parser serialization.Parser,
) error {
if len(activityInfos) > 0 {
rows := make([]sqlplugin.ActivityInfoMapsRow, len(activityInfos))
for i, activityInfo := range activityInfos {
scheduledEvent, scheduledEncoding := persistence.FromDataBlob(activityInfo.ScheduledEvent)
startEvent, startEncoding := persistence.FromDataBlob(activityInfo.StartedEvent)
info := &serialization.ActivityInfo{
Version: activityInfo.Version,
ScheduledEventBatchID: activityInfo.ScheduledEventBatchID,
ScheduledEvent: scheduledEvent,
ScheduledEventEncoding: scheduledEncoding,
ScheduledTimestamp: activityInfo.ScheduledTime,
StartedID: activityInfo.StartedID,
StartedEvent: startEvent,
StartedEventEncoding: startEncoding,
StartedTimestamp: activityInfo.StartedTime,
ActivityID: activityInfo.ActivityID,
RequestID: activityInfo.RequestID,
ScheduleToStartTimeout: activityInfo.ScheduleToStartTimeout,
ScheduleToCloseTimeout: activityInfo.ScheduleToCloseTimeout,
StartToCloseTimeout: activityInfo.StartToCloseTimeout,
HeartbeatTimeout: activityInfo.HeartbeatTimeout,
CancelRequested: activityInfo.CancelRequested,
CancelRequestID: activityInfo.CancelRequestID,
TimerTaskStatus: activityInfo.TimerTaskStatus,
Attempt: activityInfo.Attempt,
TaskList: activityInfo.TaskList,
StartedIdentity: activityInfo.StartedIdentity,
HasRetryPolicy: activityInfo.HasRetryPolicy,
RetryInitialInterval: activityInfo.InitialInterval,
RetryBackoffCoefficient: activityInfo.BackoffCoefficient,
RetryMaximumInterval: activityInfo.MaximumInterval,
RetryExpirationTimestamp: activityInfo.ExpirationTime,
RetryMaximumAttempts: activityInfo.MaximumAttempts,
RetryNonRetryableErrors: activityInfo.NonRetriableErrors,
RetryLastFailureReason: activityInfo.LastFailureReason,
RetryLastWorkerIdentity: activityInfo.LastWorkerIdentity,
RetryLastFailureDetails: activityInfo.LastFailureDetails,
}
blob, err := parser.ActivityInfoToBlob(info)
if err != nil {
return err
}
rows[i] = sqlplugin.ActivityInfoMapsRow{
ShardID: int64(shardID),
DomainID: domainID,
WorkflowID: workflowID,
RunID: runID,
ScheduleID: activityInfo.ScheduleID,
LastHeartbeatUpdatedTime: activityInfo.LastHeartBeatUpdatedTime,
LastHeartbeatDetails: activityInfo.Details,
Data: blob.Data,
DataEncoding: string(blob.Encoding),
}
}
if _, err := tx.ReplaceIntoActivityInfoMaps(ctx, rows); err != nil {
return convertCommonErrors(tx, "updateActivityInfos", "Failed to execute update query.", err)
}
}
if len(deleteInfos) > 0 {
if _, err := tx.DeleteFromActivityInfoMaps(ctx, &sqlplugin.ActivityInfoMapsFilter{
ShardID: int64(shardID),
DomainID: domainID,
WorkflowID: workflowID,
RunID: runID,
ScheduleIDs: deleteInfos,
}); err != nil {
return convertCommonErrors(tx, "updateActivityInfos", "Failed to execute delete query.", err)
}
}
return nil
}
func getActivityInfoMap(
ctx context.Context,
db sqlplugin.DB,
shardID int,
domainID serialization.UUID,
workflowID string,
runID serialization.UUID,
parser serialization.Parser,
) (map[int64]*persistence.InternalActivityInfo, error) {
rows, err := db.SelectFromActivityInfoMaps(ctx, &sqlplugin.ActivityInfoMapsFilter{
ShardID: int64(shardID),
DomainID: domainID,
WorkflowID: workflowID,
RunID: runID,
})
if err != nil && err != sql.ErrNoRows {
return nil, convertCommonErrors(db, "getActivityInfoMap", "", err)
}
ret := make(map[int64]*persistence.InternalActivityInfo)
for _, row := range rows {
decoded, err := parser.ActivityInfoFromBlob(row.Data, row.DataEncoding)
if err != nil {
return nil, err
}
info := &persistence.InternalActivityInfo{
DomainID: row.DomainID.String(),
ScheduleID: row.ScheduleID,
Details: row.LastHeartbeatDetails,
LastHeartBeatUpdatedTime: row.LastHeartbeatUpdatedTime,
Version: decoded.GetVersion(),
ScheduledEventBatchID: decoded.GetScheduledEventBatchID(),
ScheduledEvent: persistence.NewDataBlob(decoded.ScheduledEvent, common.EncodingType(decoded.GetScheduledEventEncoding())),
ScheduledTime: decoded.GetScheduledTimestamp(),
StartedID: decoded.GetStartedID(),
StartedTime: decoded.GetStartedTimestamp(),
ActivityID: decoded.GetActivityID(),
RequestID: decoded.GetRequestID(),
ScheduleToStartTimeout: decoded.GetScheduleToStartTimeout(),
ScheduleToCloseTimeout: decoded.GetScheduleToCloseTimeout(),
StartToCloseTimeout: decoded.GetStartToCloseTimeout(),
HeartbeatTimeout: decoded.GetHeartbeatTimeout(),
CancelRequested: decoded.GetCancelRequested(),
CancelRequestID: decoded.GetCancelRequestID(),
TimerTaskStatus: decoded.GetTimerTaskStatus(),
Attempt: decoded.GetAttempt(),
StartedIdentity: decoded.GetStartedIdentity(),
TaskList: decoded.GetTaskList(),
HasRetryPolicy: decoded.GetHasRetryPolicy(),
InitialInterval: decoded.GetRetryInitialInterval(),
BackoffCoefficient: decoded.GetRetryBackoffCoefficient(),
MaximumInterval: decoded.GetRetryMaximumInterval(),
ExpirationTime: decoded.GetRetryExpirationTimestamp(),
MaximumAttempts: decoded.GetRetryMaximumAttempts(),
NonRetriableErrors: decoded.GetRetryNonRetryableErrors(),
LastFailureReason: decoded.GetRetryLastFailureReason(),
LastWorkerIdentity: decoded.GetRetryLastWorkerIdentity(),
LastFailureDetails: decoded.GetRetryLastFailureDetails(),
}
if decoded.StartedEvent != nil {
info.StartedEvent = persistence.NewDataBlob(decoded.StartedEvent, common.EncodingType(decoded.GetStartedEventEncoding()))
}
ret[row.ScheduleID] = info
}
return ret, nil
}
func deleteActivityInfoMap(
ctx context.Context,
tx sqlplugin.Tx,
shardID int,
domainID serialization.UUID,
workflowID string,
runID serialization.UUID,
) error {
if _, err := tx.DeleteFromActivityInfoMaps(ctx, &sqlplugin.ActivityInfoMapsFilter{
ShardID: int64(shardID),
DomainID: domainID,
WorkflowID: workflowID,
RunID: runID,
}); err != nil {
return convertCommonErrors(tx, "deleteActivityInfoMap", "", err)
}
return nil
}
func updateTimerInfos(
ctx context.Context,
tx sqlplugin.Tx,
timerInfos []*persistence.TimerInfo,
deleteInfos []string,
shardID int,
domainID serialization.UUID,
workflowID string,
runID serialization.UUID,
parser serialization.Parser,
) error {
if len(timerInfos) > 0 {
rows := make([]sqlplugin.TimerInfoMapsRow, len(timerInfos))
for i, timerInfo := range timerInfos {
blob, err := parser.TimerInfoToBlob(&serialization.TimerInfo{
Version: timerInfo.Version,
StartedID: timerInfo.StartedID,
ExpiryTimestamp: timerInfo.ExpiryTime,
// TaskID is a misleading variable, it actually serves
// the purpose of indicating whether a timer task is
// generated for this timer info
TaskID: timerInfo.TaskStatus,
})
if err != nil {
return err
}
rows[i] = sqlplugin.TimerInfoMapsRow{
ShardID: int64(shardID),
DomainID: domainID,
WorkflowID: workflowID,
RunID: runID,
TimerID: timerInfo.TimerID,
Data: blob.Data,
DataEncoding: string(blob.Encoding),
}
}
if _, err := tx.ReplaceIntoTimerInfoMaps(ctx, rows); err != nil {
return convertCommonErrors(tx, "updateTimerInfos", "Failed to execute update query.", err)
}
}
if len(deleteInfos) > 0 {
if _, err := tx.DeleteFromTimerInfoMaps(ctx, &sqlplugin.TimerInfoMapsFilter{
ShardID: int64(shardID),
DomainID: domainID,
WorkflowID: workflowID,
RunID: runID,
TimerIDs: deleteInfos,
}); err != nil {
return convertCommonErrors(tx, "updateTimerInfos", "Failed to execute delete query.", err)
}
}
return nil
}
func getTimerInfoMap(
ctx context.Context,
db sqlplugin.DB,
shardID int,
domainID serialization.UUID,
workflowID string,
runID serialization.UUID,
parser serialization.Parser,
) (map[string]*persistence.TimerInfo, error) {
rows, err := db.SelectFromTimerInfoMaps(ctx, &sqlplugin.TimerInfoMapsFilter{
ShardID: int64(shardID),
DomainID: domainID,
WorkflowID: workflowID,
RunID: runID,
})
if err != nil && err != sql.ErrNoRows {
return nil, convertCommonErrors(db, "getTimerInfoMap", "", err)
}
ret := make(map[string]*persistence.TimerInfo)
for _, row := range rows {
info, err := parser.TimerInfoFromBlob(row.Data, row.DataEncoding)
if err != nil {
return nil, err
}
ret[row.TimerID] = &persistence.TimerInfo{
TimerID: row.TimerID,
Version: info.GetVersion(),
StartedID: info.GetStartedID(),
ExpiryTime: info.GetExpiryTimestamp(),
// TaskID is a misleading variable, it actually serves
// the purpose of indicating whether a timer task is
// generated for this timer info
TaskStatus: info.GetTaskID(),
}
}
return ret, nil
}
func deleteTimerInfoMap(
ctx context.Context,
tx sqlplugin.Tx,
shardID int,
domainID serialization.UUID,
workflowID string,
runID serialization.UUID,
) error {
if _, err := tx.DeleteFromTimerInfoMaps(ctx, &sqlplugin.TimerInfoMapsFilter{
ShardID: int64(shardID),
DomainID: domainID,
WorkflowID: workflowID,
RunID: runID,
}); err != nil {
return convertCommonErrors(tx, "deleteTimerInfoMap", "", err)
}
return nil
}
func updateChildExecutionInfos(
ctx context.Context,
tx sqlplugin.Tx,
childExecutionInfos []*persistence.InternalChildExecutionInfo,
deleteInfos []int64,
shardID int,
domainID serialization.UUID,
workflowID string,
runID serialization.UUID,
parser serialization.Parser,
) error {
if len(childExecutionInfos) > 0 {
rows := make([]sqlplugin.ChildExecutionInfoMapsRow, len(childExecutionInfos))
for i, childExecutionInfo := range childExecutionInfos {
initiateEvent, initiateEncoding := persistence.FromDataBlob(childExecutionInfo.InitiatedEvent)
startEvent, startEncoding := persistence.FromDataBlob(childExecutionInfo.StartedEvent)
info := &serialization.ChildExecutionInfo{
Version: childExecutionInfo.Version,
InitiatedEventBatchID: childExecutionInfo.InitiatedEventBatchID,
InitiatedEvent: initiateEvent,
InitiatedEventEncoding: initiateEncoding,
StartedEvent: startEvent,
StartedEventEncoding: startEncoding,
StartedID: childExecutionInfo.StartedID,
StartedWorkflowID: childExecutionInfo.StartedWorkflowID,
StartedRunID: serialization.MustParseUUID(childExecutionInfo.StartedRunID),
CreateRequestID: childExecutionInfo.CreateRequestID,
DomainID: childExecutionInfo.DomainID,
DomainNameDEPRECATED: childExecutionInfo.DomainNameDEPRECATED,
WorkflowTypeName: childExecutionInfo.WorkflowTypeName,
ParentClosePolicy: int32(childExecutionInfo.ParentClosePolicy),
}
blob, err := parser.ChildExecutionInfoToBlob(info)
if err != nil {
return err
}
rows[i] = sqlplugin.ChildExecutionInfoMapsRow{
ShardID: int64(shardID),
DomainID: domainID,
WorkflowID: workflowID,
RunID: runID,
InitiatedID: childExecutionInfo.InitiatedID,
Data: blob.Data,
DataEncoding: string(blob.Encoding),
}
}
if _, err := tx.ReplaceIntoChildExecutionInfoMaps(ctx, rows); err != nil {
return convertCommonErrors(tx, "updateChildExecutionInfos", "Failed to execute update query.", err)
}
}
if len(deleteInfos) > 0 {
if _, err := tx.DeleteFromChildExecutionInfoMaps(ctx, &sqlplugin.ChildExecutionInfoMapsFilter{
ShardID: int64(shardID),
DomainID: domainID,
WorkflowID: workflowID,
RunID: runID,
InitiatedIDs: deleteInfos,
}); err != nil {
return convertCommonErrors(tx, "updateChildExecutionInfos", "Failed to execute delete query.", err)
}
}
return nil
}
func getChildExecutionInfoMap(
ctx context.Context,
db sqlplugin.DB,
shardID int,
domainID serialization.UUID,
workflowID string,
runID serialization.UUID,
parser serialization.Parser,
) (map[int64]*persistence.InternalChildExecutionInfo, error) {
rows, err := db.SelectFromChildExecutionInfoMaps(ctx, &sqlplugin.ChildExecutionInfoMapsFilter{
ShardID: int64(shardID),
DomainID: domainID,
WorkflowID: workflowID,
RunID: runID,
})
if err != nil && err != sql.ErrNoRows {
return nil, convertCommonErrors(db, "getChildExecutionInfoMap", "", err)
}
ret := make(map[int64]*persistence.InternalChildExecutionInfo)
for _, row := range rows {
rowInfo, err := parser.ChildExecutionInfoFromBlob(row.Data, row.DataEncoding)
if err != nil {
return nil, err
}
info := &persistence.InternalChildExecutionInfo{
InitiatedID: row.InitiatedID,
InitiatedEventBatchID: rowInfo.GetInitiatedEventBatchID(),
Version: rowInfo.GetVersion(),
StartedID: rowInfo.GetStartedID(),
StartedWorkflowID: rowInfo.GetStartedWorkflowID(),
StartedRunID: serialization.UUID(rowInfo.GetStartedRunID()).String(),
CreateRequestID: rowInfo.GetCreateRequestID(),
DomainID: rowInfo.GetDomainID(),
DomainNameDEPRECATED: rowInfo.GetDomainNameDEPRECATED(),
WorkflowTypeName: rowInfo.GetWorkflowTypeName(),
ParentClosePolicy: types.ParentClosePolicy(rowInfo.GetParentClosePolicy()),
}
if rowInfo.InitiatedEvent != nil {
info.InitiatedEvent = persistence.NewDataBlob(rowInfo.InitiatedEvent, common.EncodingType(rowInfo.GetInitiatedEventEncoding()))
}
if rowInfo.StartedEvent != nil {
info.StartedEvent = persistence.NewDataBlob(rowInfo.StartedEvent, common.EncodingType(rowInfo.GetStartedEventEncoding()))
}
ret[row.InitiatedID] = info
}
return ret, nil
}
func deleteChildExecutionInfoMap(
ctx context.Context,
tx sqlplugin.Tx,
shardID int,
domainID serialization.UUID,
workflowID string,
runID serialization.UUID,
) error {
if _, err := tx.DeleteFromChildExecutionInfoMaps(ctx, &sqlplugin.ChildExecutionInfoMapsFilter{
ShardID: int64(shardID),
DomainID: domainID,
WorkflowID: workflowID,
RunID: runID,
}); err != nil {
return convertCommonErrors(tx, "deleteChildExecutionInfoMap", "", err)
}
return nil
}
func updateRequestCancelInfos(
ctx context.Context,
tx sqlplugin.Tx,
requestCancelInfos []*persistence.RequestCancelInfo,
deleteInfos []int64,
shardID int,
domainID serialization.UUID,
workflowID string,
runID serialization.UUID,
parser serialization.Parser,
) error {
if len(requestCancelInfos) > 0 {
rows := make([]sqlplugin.RequestCancelInfoMapsRow, len(requestCancelInfos))
for i, requestCancelInfo := range requestCancelInfos {
blob, err := parser.RequestCancelInfoToBlob(&serialization.RequestCancelInfo{
Version: requestCancelInfo.Version,
InitiatedEventBatchID: requestCancelInfo.InitiatedEventBatchID,
CancelRequestID: requestCancelInfo.CancelRequestID,
})
if err != nil {
return err
}
rows[i] = sqlplugin.RequestCancelInfoMapsRow{
ShardID: int64(shardID),
DomainID: domainID,
WorkflowID: workflowID,
RunID: runID,
InitiatedID: requestCancelInfo.InitiatedID,
Data: blob.Data,
DataEncoding: string(blob.Encoding),
}
}
if _, err := tx.ReplaceIntoRequestCancelInfoMaps(ctx, rows); err != nil {
return convertCommonErrors(tx, "updateRequestCancelInfos", "Failed to execute update query.", err)
}
}
if len(deleteInfos) > 0 {
if _, err := tx.DeleteFromRequestCancelInfoMaps(ctx, &sqlplugin.RequestCancelInfoMapsFilter{
ShardID: int64(shardID),
DomainID: domainID,
WorkflowID: workflowID,
RunID: runID,
InitiatedIDs: deleteInfos,
}); err != nil {
return convertCommonErrors(tx, "updateRequestCancelInfos", "Failed to execute delete query.", err)
}
}
return nil
}
func getRequestCancelInfoMap(
ctx context.Context,
db sqlplugin.DB,
shardID int,
domainID serialization.UUID,
workflowID string,
runID serialization.UUID,
parser serialization.Parser,
) (map[int64]*persistence.RequestCancelInfo, error) {
rows, err := db.SelectFromRequestCancelInfoMaps(ctx, &sqlplugin.RequestCancelInfoMapsFilter{
ShardID: int64(shardID),
DomainID: domainID,
WorkflowID: workflowID,
RunID: runID,
})
if err != nil && err != sql.ErrNoRows {
return nil, convertCommonErrors(db, "getRequestCancelInfoMap", "", err)
}
ret := make(map[int64]*persistence.RequestCancelInfo)
for _, row := range rows {
rowInfo, err := parser.RequestCancelInfoFromBlob(row.Data, row.DataEncoding)
if err != nil {
return nil, err
}
ret[row.InitiatedID] = &persistence.RequestCancelInfo{
Version: rowInfo.GetVersion(),
InitiatedID: row.InitiatedID,
InitiatedEventBatchID: rowInfo.GetInitiatedEventBatchID(),
CancelRequestID: rowInfo.GetCancelRequestID(),
}
}
return ret, nil
}
func deleteRequestCancelInfoMap(
ctx context.Context,
tx sqlplugin.Tx,
shardID int,
domainID serialization.UUID,
workflowID string,
runID serialization.UUID,
) error {
if _, err := tx.DeleteFromRequestCancelInfoMaps(ctx, &sqlplugin.RequestCancelInfoMapsFilter{
ShardID: int64(shardID),
DomainID: domainID,
WorkflowID: workflowID,
RunID: runID,
}); err != nil {
return convertCommonErrors(tx, "deleteRequestCancelInfoMap", "", err)
}
return nil
}
func updateSignalInfos(
ctx context.Context,
tx sqlplugin.Tx,
signalInfos []*persistence.SignalInfo,
deleteInfos []int64,
shardID int,
domainID serialization.UUID,
workflowID string,
runID serialization.UUID,
parser serialization.Parser,
) error {
if len(signalInfos) > 0 {
rows := make([]sqlplugin.SignalInfoMapsRow, len(signalInfos))
for i, signalInfo := range signalInfos {
blob, err := parser.SignalInfoToBlob(&serialization.SignalInfo{
Version: signalInfo.Version,
InitiatedEventBatchID: signalInfo.InitiatedEventBatchID,
RequestID: signalInfo.SignalRequestID,
Name: signalInfo.SignalName,
Input: signalInfo.Input,
Control: signalInfo.Control,
})
if err != nil {
return err
}
rows[i] = sqlplugin.SignalInfoMapsRow{
ShardID: int64(shardID),
DomainID: domainID,
WorkflowID: workflowID,
RunID: runID,
InitiatedID: signalInfo.InitiatedID,
Data: blob.Data,
DataEncoding: string(blob.Encoding),
}
}
if _, err := tx.ReplaceIntoSignalInfoMaps(ctx, rows); err != nil {
return convertCommonErrors(tx, "updateSignalInfos", "Failed to execute update query.", err)
}
}
if len(deleteInfos) > 0 {
if _, err := tx.DeleteFromSignalInfoMaps(ctx, &sqlplugin.SignalInfoMapsFilter{
ShardID: int64(shardID),
DomainID: domainID,
WorkflowID: workflowID,
RunID: runID,
InitiatedIDs: deleteInfos,
}); err != nil {
return convertCommonErrors(tx, "updateSignalInfos", "Failed to execute delete query.", err)
}
}
return nil
}
func getSignalInfoMap(
ctx context.Context,
db sqlplugin.DB,
shardID int,
domainID serialization.UUID,
workflowID string,
runID serialization.UUID,
parser serialization.Parser,
) (map[int64]*persistence.SignalInfo, error) {
rows, err := db.SelectFromSignalInfoMaps(ctx, &sqlplugin.SignalInfoMapsFilter{
ShardID: int64(shardID),
DomainID: domainID,
WorkflowID: workflowID,
RunID: runID,
})
if err != nil && err != sql.ErrNoRows {
return nil, convertCommonErrors(db, "getSignalInfoMap", "", err)
}
ret := make(map[int64]*persistence.SignalInfo)
for _, row := range rows {
rowInfo, err := parser.SignalInfoFromBlob(row.Data, row.DataEncoding)
if err != nil {
return nil, err
}
ret[row.InitiatedID] = &persistence.SignalInfo{
Version: rowInfo.GetVersion(),
InitiatedID: row.InitiatedID,
InitiatedEventBatchID: rowInfo.GetInitiatedEventBatchID(),
SignalRequestID: rowInfo.GetRequestID(),
SignalName: rowInfo.GetName(),
Input: rowInfo.GetInput(),
Control: rowInfo.GetControl(),
}
}
return ret, nil
}
func deleteSignalInfoMap(
ctx context.Context,
tx sqlplugin.Tx,
shardID int,
domainID serialization.UUID,
workflowID string,
runID serialization.UUID,
) error {
if _, err := tx.DeleteFromSignalInfoMaps(ctx, &sqlplugin.SignalInfoMapsFilter{
ShardID: int64(shardID),
DomainID: domainID,
WorkflowID: workflowID,
RunID: runID,
}); err != nil {
return convertCommonErrors(tx, "deleteSignalInfoMap", "", err)
}
return nil
}