common/persistence/sql/sql_execution_store.go (1,306 lines of code) (raw):

// Copyright (c) 2018 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package sql import ( "bytes" "context" "database/sql" "encoding/json" "fmt" "math" "runtime/debug" "time" "golang.org/x/sync/errgroup" "github.com/uber/cadence/common" "github.com/uber/cadence/common/collection" "github.com/uber/cadence/common/log" p "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/persistence/serialization" "github.com/uber/cadence/common/persistence/sql/sqlplugin" "github.com/uber/cadence/common/types" ) const ( emptyWorkflowID string = "" emptyReplicationRunID string = "30000000-5000-f000-f000-000000000000" ) type sqlExecutionStore struct { sqlStore shardID int txExecuteShardLockedFn func(context.Context, int, string, int64, func(sqlplugin.Tx) error) error lockCurrentExecutionIfExistsFn func(context.Context, sqlplugin.Tx, int, serialization.UUID, string) (*sqlplugin.CurrentExecutionsRow, error) createOrUpdateCurrentExecutionFn func(context.Context, sqlplugin.Tx, p.CreateWorkflowMode, int, serialization.UUID, string, serialization.UUID, int, int, string, int64, int64) error assertNotCurrentExecutionFn func(context.Context, sqlplugin.Tx, int, serialization.UUID, string, serialization.UUID) error assertRunIDAndUpdateCurrentExecutionFn func(context.Context, sqlplugin.Tx, int, serialization.UUID, string, serialization.UUID, serialization.UUID, string, int, int, int64, int64) error applyWorkflowSnapshotTxAsNewFn func(context.Context, sqlplugin.Tx, int, *p.InternalWorkflowSnapshot, serialization.Parser) error applyWorkflowMutationTxFn func(context.Context, sqlplugin.Tx, int, *p.InternalWorkflowMutation, serialization.Parser) error applyWorkflowSnapshotTxAsResetFn func(context.Context, sqlplugin.Tx, int, *p.InternalWorkflowSnapshot, serialization.Parser) error } var _ p.ExecutionStore = (*sqlExecutionStore)(nil) // NewSQLExecutionStore creates an instance of ExecutionStore func NewSQLExecutionStore( db sqlplugin.DB, logger log.Logger, shardID int, parser serialization.Parser, dc *p.DynamicConfiguration, ) (p.ExecutionStore, error) { store := &sqlExecutionStore{ shardID: shardID, lockCurrentExecutionIfExistsFn: lockCurrentExecutionIfExists, createOrUpdateCurrentExecutionFn: createOrUpdateCurrentExecution, assertNotCurrentExecutionFn: assertNotCurrentExecution, assertRunIDAndUpdateCurrentExecutionFn: assertRunIDAndUpdateCurrentExecution, applyWorkflowSnapshotTxAsNewFn: applyWorkflowSnapshotTxAsNew, applyWorkflowMutationTxFn: applyWorkflowMutationTx, applyWorkflowSnapshotTxAsResetFn: applyWorkflowSnapshotTxAsReset, sqlStore: sqlStore{ db: db, logger: logger, parser: parser, dc: dc, }, } store.txExecuteShardLockedFn = store.txExecuteShardLocked return store, nil } // txExecuteShardLocked executes f under transaction and with read lock on shard row func (m *sqlExecutionStore) txExecuteShardLocked( ctx context.Context, dbShardID int, operation string, rangeID int64, fn func(tx sqlplugin.Tx) error, ) error { return m.txExecute(ctx, dbShardID, operation, func(tx sqlplugin.Tx) error { if err := readLockShard(ctx, tx, m.shardID, rangeID); err != nil { return err } err := fn(tx) if err != nil { return err } return nil }) } func (m *sqlExecutionStore) GetShardID() int { return m.shardID } func (m *sqlExecutionStore) CreateWorkflowExecution( ctx context.Context, request *p.InternalCreateWorkflowExecutionRequest, ) (response *p.CreateWorkflowExecutionResponse, err error) { dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(m.shardID, m.db.GetTotalNumDBShards()) err = m.txExecuteShardLockedFn(ctx, dbShardID, "CreateWorkflowExecution", request.RangeID, func(tx sqlplugin.Tx) error { response, err = m.createWorkflowExecutionTx(ctx, tx, request) return err }) return } func (m *sqlExecutionStore) createWorkflowExecutionTx( ctx context.Context, tx sqlplugin.Tx, request *p.InternalCreateWorkflowExecutionRequest, ) (*p.CreateWorkflowExecutionResponse, error) { newWorkflow := request.NewWorkflowSnapshot executionInfo := newWorkflow.ExecutionInfo startVersion := newWorkflow.StartVersion lastWriteVersion := newWorkflow.LastWriteVersion shardID := m.shardID domainID := serialization.MustParseUUID(executionInfo.DomainID) workflowID := executionInfo.WorkflowID runID := serialization.MustParseUUID(executionInfo.RunID) if err := p.ValidateCreateWorkflowModeState( request.Mode, newWorkflow, ); err != nil { return nil, err } var err error var row *sqlplugin.CurrentExecutionsRow if row, err = m.lockCurrentExecutionIfExistsFn(ctx, tx, m.shardID, domainID, workflowID); err != nil { return nil, err } // current workflow record check if row != nil { // current run ID, last write version, current workflow state check switch request.Mode { case p.CreateWorkflowModeBrandNew: return nil, &p.WorkflowExecutionAlreadyStartedError{ Msg: fmt.Sprintf("Workflow execution already running. WorkflowId: %v", row.WorkflowID), StartRequestID: row.CreateRequestID, RunID: row.RunID.String(), State: int(row.State), CloseStatus: int(row.CloseStatus), LastWriteVersion: row.LastWriteVersion, } case p.CreateWorkflowModeWorkflowIDReuse: if request.PreviousLastWriteVersion != row.LastWriteVersion { return nil, &p.CurrentWorkflowConditionFailedError{ Msg: fmt.Sprintf("Workflow execution creation condition failed. WorkflowId: %v, "+ "LastWriteVersion: %v, PreviousLastWriteVersion: %v", workflowID, row.LastWriteVersion, request.PreviousLastWriteVersion), } } if row.State != p.WorkflowStateCompleted { return nil, &p.CurrentWorkflowConditionFailedError{ Msg: fmt.Sprintf("Workflow execution creation condition failed. WorkflowId: %v, "+ "State: %v, Expected: %v", workflowID, row.State, p.WorkflowStateCompleted), } } runIDStr := row.RunID.String() if runIDStr != request.PreviousRunID { return nil, &p.CurrentWorkflowConditionFailedError{ Msg: fmt.Sprintf("Workflow execution creation condition failed. WorkflowId: %v, "+ "RunID: %v, PreviousRunID: %v", workflowID, runIDStr, request.PreviousRunID), } } case p.CreateWorkflowModeZombie: // zombie workflow creation with existence of current record, this is a noop if err := assertRunIDMismatch(serialization.MustParseUUID(executionInfo.RunID), row.RunID); err != nil { return nil, err } case p.CreateWorkflowModeContinueAsNew: runIDStr := row.RunID.String() if runIDStr != request.PreviousRunID { return nil, &p.CurrentWorkflowConditionFailedError{ Msg: fmt.Sprintf("Workflow execution creation condition failed. WorkflowId: %v, "+ "RunID: %v, PreviousRunID: %v", workflowID, runIDStr, request.PreviousRunID), } } default: return nil, &types.InternalServiceError{ Message: fmt.Sprintf( "CreteWorkflowExecution: unknown mode: %v", request.Mode, ), } } } if err := m.createOrUpdateCurrentExecutionFn( ctx, tx, request.Mode, m.shardID, domainID, workflowID, runID, executionInfo.State, executionInfo.CloseStatus, executionInfo.CreateRequestID, startVersion, lastWriteVersion); err != nil { return nil, err } if err := m.applyWorkflowSnapshotTxAsNewFn(ctx, tx, shardID, &request.NewWorkflowSnapshot, m.parser); err != nil { return nil, err } return &p.CreateWorkflowExecutionResponse{}, nil } func (m *sqlExecutionStore) getExecutions( ctx context.Context, request *p.InternalGetWorkflowExecutionRequest, domainID serialization.UUID, wfID string, runID serialization.UUID, ) ([]sqlplugin.ExecutionsRow, error) { executions, err := m.db.SelectFromExecutions(ctx, &sqlplugin.ExecutionsFilter{ ShardID: m.shardID, DomainID: domainID, WorkflowID: wfID, RunID: runID}) if err != nil { if err == sql.ErrNoRows { return nil, &types.EntityNotExistsError{ Message: fmt.Sprintf( "Workflow execution not found. WorkflowId: %v, RunId: %v", request.Execution.GetWorkflowID(), request.Execution.GetRunID(), ), } } return nil, convertCommonErrors(m.db, "GetWorkflowExecution", "", err) } if len(executions) == 0 { return nil, &types.EntityNotExistsError{ Message: fmt.Sprintf( "Workflow execution not found. WorkflowId: %v, RunId: %v", request.Execution.GetWorkflowID(), request.Execution.GetRunID(), ), } } if len(executions) != 1 { return nil, &types.InternalServiceError{ Message: "GetWorkflowExecution return more than one results.", } } return executions, nil } func (m *sqlExecutionStore) GetWorkflowExecution( ctx context.Context, request *p.InternalGetWorkflowExecutionRequest, ) (resp *p.InternalGetWorkflowExecutionResponse, e error) { recoverPanic := func(recovered interface{}, err *error) { if recovered != nil { *err = fmt.Errorf("DB operation panicked: %v %s", recovered, debug.Stack()) } } domainID := serialization.MustParseUUID(request.DomainID) runID := serialization.MustParseUUID(request.Execution.RunID) wfID := request.Execution.WorkflowID var executions []sqlplugin.ExecutionsRow var activityInfos map[int64]*p.InternalActivityInfo var timerInfos map[string]*p.TimerInfo var childExecutionInfos map[int64]*p.InternalChildExecutionInfo var requestCancelInfos map[int64]*p.RequestCancelInfo var signalInfos map[int64]*p.SignalInfo var bufferedEvents []*p.DataBlob var signalsRequested map[string]struct{} g, childCtx := errgroup.WithContext(ctx) g.Go(func() (e error) { defer func() { recoverPanic(recover(), &e) }() executions, e = m.getExecutions(childCtx, request, domainID, wfID, runID) return e }) g.Go(func() (e error) { defer func() { recoverPanic(recover(), &e) }() activityInfos, e = getActivityInfoMap( childCtx, m.db, m.shardID, domainID, wfID, runID, m.parser) return e }) g.Go(func() (e error) { defer func() { recoverPanic(recover(), &e) }() timerInfos, e = getTimerInfoMap( childCtx, m.db, m.shardID, domainID, wfID, runID, m.parser) return e }) g.Go(func() (e error) { defer func() { recoverPanic(recover(), &e) }() childExecutionInfos, e = getChildExecutionInfoMap( childCtx, m.db, m.shardID, domainID, wfID, runID, m.parser) return e }) g.Go(func() (e error) { defer func() { recoverPanic(recover(), &e) }() requestCancelInfos, e = getRequestCancelInfoMap( childCtx, m.db, m.shardID, domainID, wfID, runID, m.parser) return e }) g.Go(func() (e error) { defer func() { recoverPanic(recover(), &e) }() signalInfos, e = getSignalInfoMap( childCtx, m.db, m.shardID, domainID, wfID, runID, m.parser) return e }) g.Go(func() (e error) { defer func() { recoverPanic(recover(), &e) }() bufferedEvents, e = getBufferedEvents( childCtx, m.db, m.shardID, domainID, wfID, runID) return e }) g.Go(func() (e error) { defer func() { recoverPanic(recover(), &e) }() signalsRequested, e = getSignalsRequested( childCtx, m.db, m.shardID, domainID, wfID, runID) return e }) err := g.Wait() if err != nil { return nil, err } state, err := m.populateWorkflowMutableState(executions[0]) if err != nil { return nil, &types.InternalServiceError{ Message: fmt.Sprintf("GetWorkflowExecution: failed. Error: %v", err), } } // if we have checksum, we need to make sure the rangeID did not change // if the rangeID changed, it means the shard ownership might have changed // and the workflow might have been updated when we read the data, so the data // we read might not be from a consistent view, the checksum validation might fail // in that case, we clear the checksum data so that we will not perform the validation if state.ChecksumData != nil { row, err := m.db.SelectFromShards(ctx, &sqlplugin.ShardsFilter{ShardID: int64(m.shardID)}) if err != nil { return nil, convertCommonErrors(m.db, "GetWorkflowExecution", "", err) } if row.RangeID != request.RangeID { // The GetWorkflowExecution operation will not be impacted by this. ChecksumData is purely for validation purposes. m.logger.Warn("GetWorkflowExecution's checksum is discarded. The shard might have changed owner.") state.ChecksumData = nil } } state.ActivityInfos = activityInfos state.TimerInfos = timerInfos state.ChildExecutionInfos = childExecutionInfos state.RequestCancelInfos = requestCancelInfos state.SignalInfos = signalInfos state.BufferedEvents = bufferedEvents state.SignalRequestedIDs = signalsRequested return &p.InternalGetWorkflowExecutionResponse{State: state}, nil } func (m *sqlExecutionStore) UpdateWorkflowExecution( ctx context.Context, request *p.InternalUpdateWorkflowExecutionRequest, ) error { dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(m.shardID, m.db.GetTotalNumDBShards()) return m.txExecuteShardLockedFn(ctx, dbShardID, "UpdateWorkflowExecution", request.RangeID, func(tx sqlplugin.Tx) error { return m.updateWorkflowExecutionTx(ctx, tx, request) }) } func (m *sqlExecutionStore) updateWorkflowExecutionTx( ctx context.Context, tx sqlplugin.Tx, request *p.InternalUpdateWorkflowExecutionRequest, ) error { updateWorkflow := request.UpdateWorkflowMutation newWorkflow := request.NewWorkflowSnapshot executionInfo := updateWorkflow.ExecutionInfo domainID := serialization.MustParseUUID(executionInfo.DomainID) workflowID := executionInfo.WorkflowID runID := serialization.MustParseUUID(executionInfo.RunID) shardID := m.shardID if err := p.ValidateUpdateWorkflowModeState( request.Mode, updateWorkflow, newWorkflow, ); err != nil { return err } switch request.Mode { case p.UpdateWorkflowModeIgnoreCurrent: // no-op case p.UpdateWorkflowModeBypassCurrent: if err := m.assertNotCurrentExecutionFn( ctx, tx, shardID, domainID, workflowID, runID); err != nil { return err } case p.UpdateWorkflowModeUpdateCurrent: if newWorkflow != nil { newExecutionInfo := newWorkflow.ExecutionInfo startVersion := newWorkflow.StartVersion lastWriteVersion := newWorkflow.LastWriteVersion newDomainID := serialization.MustParseUUID(newExecutionInfo.DomainID) newRunID := serialization.MustParseUUID(newExecutionInfo.RunID) if !bytes.Equal(domainID, newDomainID) { return &types.InternalServiceError{ Message: "UpdateWorkflowExecution: cannot continue as new to another domain", } } if err := m.assertRunIDAndUpdateCurrentExecutionFn( ctx, tx, shardID, domainID, workflowID, newRunID, runID, newWorkflow.ExecutionInfo.CreateRequestID, newWorkflow.ExecutionInfo.State, newWorkflow.ExecutionInfo.CloseStatus, startVersion, lastWriteVersion); err != nil { return err } } else { startVersion := updateWorkflow.StartVersion lastWriteVersion := updateWorkflow.LastWriteVersion // this is only to update the current record if err := m.assertRunIDAndUpdateCurrentExecutionFn( ctx, tx, shardID, domainID, workflowID, runID, runID, executionInfo.CreateRequestID, executionInfo.State, executionInfo.CloseStatus, startVersion, lastWriteVersion); err != nil { return err } } default: return &types.InternalServiceError{ Message: fmt.Sprintf("UpdateWorkflowExecution: unknown mode: %v", request.Mode), } } if err := m.applyWorkflowMutationTxFn(ctx, tx, shardID, &updateWorkflow, m.parser); err != nil { return err } if newWorkflow != nil { if err := m.applyWorkflowSnapshotTxAsNewFn(ctx, tx, shardID, newWorkflow, m.parser); err != nil { return err } } return nil } func (m *sqlExecutionStore) ConflictResolveWorkflowExecution( ctx context.Context, request *p.InternalConflictResolveWorkflowExecutionRequest, ) error { dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(m.shardID, m.db.GetTotalNumDBShards()) return m.txExecuteShardLockedFn(ctx, dbShardID, "ConflictResolveWorkflowExecution", request.RangeID, func(tx sqlplugin.Tx) error { return m.conflictResolveWorkflowExecutionTx(ctx, tx, request) }) } func (m *sqlExecutionStore) conflictResolveWorkflowExecutionTx( ctx context.Context, tx sqlplugin.Tx, request *p.InternalConflictResolveWorkflowExecutionRequest, ) error { currentWorkflow := request.CurrentWorkflowMutation resetWorkflow := request.ResetWorkflowSnapshot newWorkflow := request.NewWorkflowSnapshot shardID := m.shardID domainID := serialization.MustParseUUID(resetWorkflow.ExecutionInfo.DomainID) workflowID := resetWorkflow.ExecutionInfo.WorkflowID if err := p.ValidateConflictResolveWorkflowModeState( request.Mode, resetWorkflow, newWorkflow, currentWorkflow, ); err != nil { return err } switch request.Mode { case p.ConflictResolveWorkflowModeBypassCurrent: if err := m.assertNotCurrentExecutionFn( ctx, tx, shardID, domainID, workflowID, serialization.MustParseUUID(resetWorkflow.ExecutionInfo.RunID)); err != nil { return err } case p.ConflictResolveWorkflowModeUpdateCurrent: executionInfo := resetWorkflow.ExecutionInfo startVersion := resetWorkflow.StartVersion lastWriteVersion := resetWorkflow.LastWriteVersion if newWorkflow != nil { executionInfo = newWorkflow.ExecutionInfo startVersion = newWorkflow.StartVersion lastWriteVersion = newWorkflow.LastWriteVersion } runID := serialization.MustParseUUID(executionInfo.RunID) createRequestID := executionInfo.CreateRequestID state := executionInfo.State closeStatus := executionInfo.CloseStatus if currentWorkflow != nil { prevRunID := serialization.MustParseUUID(currentWorkflow.ExecutionInfo.RunID) if err := m.assertRunIDAndUpdateCurrentExecutionFn( ctx, tx, m.shardID, domainID, workflowID, runID, prevRunID, createRequestID, state, closeStatus, startVersion, lastWriteVersion); err != nil { return err } } else { // reset workflow is current prevRunID := serialization.MustParseUUID(resetWorkflow.ExecutionInfo.RunID) if err := m.assertRunIDAndUpdateCurrentExecutionFn( ctx, tx, m.shardID, domainID, workflowID, runID, prevRunID, createRequestID, state, closeStatus, startVersion, lastWriteVersion); err != nil { return err } } default: return &types.InternalServiceError{ Message: fmt.Sprintf("ConflictResolveWorkflowExecution: unknown mode: %v", request.Mode), } } if err := m.applyWorkflowSnapshotTxAsResetFn(ctx, tx, shardID, &resetWorkflow, m.parser); err != nil { return err } if currentWorkflow != nil { if err := m.applyWorkflowMutationTxFn(ctx, tx, shardID, currentWorkflow, m.parser); err != nil { return err } } if newWorkflow != nil { if err := m.applyWorkflowSnapshotTxAsNewFn(ctx, tx, shardID, newWorkflow, m.parser); err != nil { return err } } return nil } func (m *sqlExecutionStore) DeleteWorkflowExecution( ctx context.Context, request *p.DeleteWorkflowExecutionRequest, ) error { dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(m.shardID, m.db.GetTotalNumDBShards()) domainID := serialization.MustParseUUID(request.DomainID) runID := serialization.MustParseUUID(request.RunID) wfID := request.WorkflowID return m.txExecute(ctx, dbShardID, "DeleteWorkflowExecution", func(tx sqlplugin.Tx) error { if _, err := tx.DeleteFromExecutions(ctx, &sqlplugin.ExecutionsFilter{ ShardID: m.shardID, DomainID: domainID, WorkflowID: wfID, RunID: runID, }); err != nil { return convertCommonErrors(tx, "DeleteWorkflowExecution", "", err) } if _, err := tx.DeleteFromActivityInfoMaps(ctx, &sqlplugin.ActivityInfoMapsFilter{ ShardID: int64(m.shardID), DomainID: domainID, WorkflowID: wfID, RunID: runID, }); err != nil { return convertCommonErrors(tx, "DeleteFromActivityInfoMaps", "", err) } if _, err := tx.DeleteFromTimerInfoMaps(ctx, &sqlplugin.TimerInfoMapsFilter{ ShardID: int64(m.shardID), DomainID: domainID, WorkflowID: wfID, RunID: runID, }); err != nil { return convertCommonErrors(tx, "DeleteFromTimerInfoMaps", "", err) } if _, err := tx.DeleteFromChildExecutionInfoMaps(ctx, &sqlplugin.ChildExecutionInfoMapsFilter{ ShardID: int64(m.shardID), DomainID: domainID, WorkflowID: wfID, RunID: runID, }); err != nil { return convertCommonErrors(tx, "DeleteFromChildExecutionInfoMaps", "", err) } if _, err := tx.DeleteFromRequestCancelInfoMaps(ctx, &sqlplugin.RequestCancelInfoMapsFilter{ ShardID: int64(m.shardID), DomainID: domainID, WorkflowID: wfID, RunID: runID, }); err != nil { return convertCommonErrors(tx, "DeleteFromRequestCancelInfoMaps", "", err) } if _, err := tx.DeleteFromSignalInfoMaps(ctx, &sqlplugin.SignalInfoMapsFilter{ ShardID: int64(m.shardID), DomainID: domainID, WorkflowID: wfID, RunID: runID, }); err != nil { return convertCommonErrors(tx, "DeleteFromSignalInfoMaps", "", err) } if _, err := tx.DeleteFromBufferedEvents(ctx, &sqlplugin.BufferedEventsFilter{ ShardID: m.shardID, DomainID: domainID, WorkflowID: wfID, RunID: runID, }); err != nil { return convertCommonErrors(tx, "DeleteFromBufferedEvents", "", err) } if _, err := tx.DeleteFromSignalsRequestedSets(ctx, &sqlplugin.SignalsRequestedSetsFilter{ ShardID: int64(m.shardID), DomainID: domainID, WorkflowID: wfID, RunID: runID, }); err != nil { return convertCommonErrors(tx, "DeleteFromSignalsRequestedSets", "", err) } return nil }) } // its possible for a new run of the same workflow to have started after the run we are deleting // here was finished. In that case, current_executions table will have the same workflowID but different // runID. The following code will delete the row from current_executions if and only if the runID is // same as the one we are trying to delete here func (m *sqlExecutionStore) DeleteCurrentWorkflowExecution( ctx context.Context, request *p.DeleteCurrentWorkflowExecutionRequest, ) error { domainID := serialization.MustParseUUID(request.DomainID) runID := serialization.MustParseUUID(request.RunID) _, err := m.db.DeleteFromCurrentExecutions(ctx, &sqlplugin.CurrentExecutionsFilter{ ShardID: int64(m.shardID), DomainID: domainID, WorkflowID: request.WorkflowID, RunID: runID, }) if err != nil { return convertCommonErrors(m.db, "DeleteCurrentWorkflowExecution", "", err) } return nil } func (m *sqlExecutionStore) GetCurrentExecution( ctx context.Context, request *p.GetCurrentExecutionRequest, ) (*p.GetCurrentExecutionResponse, error) { row, err := m.db.SelectFromCurrentExecutions(ctx, &sqlplugin.CurrentExecutionsFilter{ ShardID: int64(m.shardID), DomainID: serialization.MustParseUUID(request.DomainID), WorkflowID: request.WorkflowID, }) if err != nil { return nil, convertCommonErrors(m.db, "GetCurrentExecution", "", err) } return &p.GetCurrentExecutionResponse{ StartRequestID: row.CreateRequestID, RunID: row.RunID.String(), State: int(row.State), CloseStatus: int(row.CloseStatus), LastWriteVersion: row.LastWriteVersion, }, nil } func (m *sqlExecutionStore) ListCurrentExecutions( _ context.Context, _ *p.ListCurrentExecutionsRequest, ) (*p.ListCurrentExecutionsResponse, error) { return nil, &types.InternalServiceError{Message: "Not yet implemented"} } func (m *sqlExecutionStore) IsWorkflowExecutionExists( _ context.Context, _ *p.IsWorkflowExecutionExistsRequest, ) (*p.IsWorkflowExecutionExistsResponse, error) { return nil, &types.InternalServiceError{Message: "Not yet implemented"} } func (m *sqlExecutionStore) ListConcreteExecutions( ctx context.Context, request *p.ListConcreteExecutionsRequest, ) (*p.InternalListConcreteExecutionsResponse, error) { filter := &sqlplugin.ExecutionsFilter{} if len(request.PageToken) > 0 { err := gobDeserialize(request.PageToken, &filter) if err != nil { return nil, &types.InternalServiceError{ Message: fmt.Sprintf("ListConcreteExecutions failed. Error: %v", err), } } } else { filter = &sqlplugin.ExecutionsFilter{ ShardID: m.shardID, WorkflowID: "", } } filter.Size = request.PageSize executions, err := m.db.SelectFromExecutions(ctx, filter) if err != nil { if err == sql.ErrNoRows { return &p.InternalListConcreteExecutionsResponse{}, nil } return nil, convertCommonErrors(m.db, "ListConcreteExecutions", "", err) } if len(executions) == 0 { return &p.InternalListConcreteExecutionsResponse{}, nil } lastExecution := executions[len(executions)-1] nextFilter := &sqlplugin.ExecutionsFilter{ ShardID: m.shardID, WorkflowID: lastExecution.WorkflowID, } token, err := gobSerialize(nextFilter) if err != nil { return nil, &types.InternalServiceError{ Message: fmt.Sprintf("ListConcreteExecutions failed. Error: %v", err), } } concreteExecutions, err := m.populateInternalListConcreteExecutions(executions) if err != nil { return nil, &types.InternalServiceError{ Message: fmt.Sprintf("ListConcreteExecutions failed. Error: %v", err), } } return &p.InternalListConcreteExecutionsResponse{ Executions: concreteExecutions, NextPageToken: token, }, nil } func (m *sqlExecutionStore) GetTransferTasks( ctx context.Context, request *p.GetTransferTasksRequest, ) (*p.GetTransferTasksResponse, error) { minReadLevel := request.ReadLevel if len(request.NextPageToken) > 0 { readLevel, err := deserializePageToken(request.NextPageToken) if err != nil { return nil, convertCommonErrors(m.db, "GetTransferTasks", "failed to deserialize page token", err) } minReadLevel = readLevel } rows, err := m.db.SelectFromTransferTasks(ctx, &sqlplugin.TransferTasksFilter{ ShardID: m.shardID, MinTaskID: minReadLevel, MaxTaskID: request.MaxReadLevel, PageSize: request.BatchSize, }) if err != nil { if err != sql.ErrNoRows { return nil, convertCommonErrors(m.db, "GetTransferTasks", "", err) } } resp := &p.GetTransferTasksResponse{Tasks: make([]*p.TransferTaskInfo, len(rows))} for i, row := range rows { info, err := m.parser.TransferTaskInfoFromBlob(row.Data, row.DataEncoding) if err != nil { return nil, err } resp.Tasks[i] = &p.TransferTaskInfo{ TaskID: row.TaskID, DomainID: info.DomainID.String(), WorkflowID: info.GetWorkflowID(), RunID: info.RunID.String(), VisibilityTimestamp: info.GetVisibilityTimestamp(), TargetDomainID: info.TargetDomainID.String(), TargetDomainIDs: info.GetTargetDomainIDs(), TargetWorkflowID: info.GetTargetWorkflowID(), TargetRunID: info.TargetRunID.String(), TargetChildWorkflowOnly: info.GetTargetChildWorkflowOnly(), TaskList: info.GetTaskList(), TaskType: int(info.GetTaskType()), ScheduleID: info.GetScheduleID(), Version: info.GetVersion(), } } if len(rows) > 0 { lastTaskID := rows[len(rows)-1].TaskID if lastTaskID < request.MaxReadLevel { resp.NextPageToken = serializePageToken(lastTaskID) } } return resp, nil } func (m *sqlExecutionStore) CompleteTransferTask( ctx context.Context, request *p.CompleteTransferTaskRequest, ) error { if _, err := m.db.DeleteFromTransferTasks(ctx, &sqlplugin.TransferTasksFilter{ ShardID: m.shardID, TaskID: request.TaskID, }); err != nil { return convertCommonErrors(m.db, "CompleteTransferTask", "", err) } return nil } func (m *sqlExecutionStore) RangeCompleteTransferTask( ctx context.Context, request *p.RangeCompleteTransferTaskRequest, ) (*p.RangeCompleteTransferTaskResponse, error) { result, err := m.db.RangeDeleteFromTransferTasks(ctx, &sqlplugin.TransferTasksFilter{ ShardID: m.shardID, MinTaskID: request.ExclusiveBeginTaskID, MaxTaskID: request.InclusiveEndTaskID, PageSize: request.PageSize, }) if err != nil { return nil, convertCommonErrors(m.db, "RangeCompleteTransferTask", "", err) } rowsDeleted, err := result.RowsAffected() if err != nil { return nil, convertCommonErrors(m.db, "RangeCompleteTransferTask", "", err) } return &p.RangeCompleteTransferTaskResponse{TasksCompleted: int(rowsDeleted)}, nil } func (m *sqlExecutionStore) GetCrossClusterTasks( ctx context.Context, request *p.GetCrossClusterTasksRequest, ) (*p.GetCrossClusterTasksResponse, error) { minReadLevel := request.ReadLevel if len(request.NextPageToken) > 0 { readLevel, err := deserializePageToken(request.NextPageToken) if err != nil { return nil, convertCommonErrors(m.db, "GetCrossClusterTasks", "failed to deserialize page token", err) } minReadLevel = readLevel } rows, err := m.db.SelectFromCrossClusterTasks(ctx, &sqlplugin.CrossClusterTasksFilter{ TargetCluster: request.TargetCluster, ShardID: m.shardID, MinTaskID: minReadLevel, MaxTaskID: request.MaxReadLevel, PageSize: request.BatchSize, }) if err != nil { if err != sql.ErrNoRows { return nil, convertCommonErrors(m.db, "GetCrossClusterTasks", "", err) } } resp := &p.GetCrossClusterTasksResponse{Tasks: make([]*p.CrossClusterTaskInfo, len(rows))} for i, row := range rows { info, err := m.parser.CrossClusterTaskInfoFromBlob(row.Data, row.DataEncoding) if err != nil { return nil, err } resp.Tasks[i] = &p.CrossClusterTaskInfo{ TaskID: row.TaskID, DomainID: info.DomainID.String(), WorkflowID: info.GetWorkflowID(), RunID: info.RunID.String(), VisibilityTimestamp: info.GetVisibilityTimestamp(), TargetDomainID: info.TargetDomainID.String(), TargetDomainIDs: info.GetTargetDomainIDs(), TargetWorkflowID: info.GetTargetWorkflowID(), TargetRunID: info.TargetRunID.String(), TargetChildWorkflowOnly: info.GetTargetChildWorkflowOnly(), TaskList: info.GetTaskList(), TaskType: int(info.GetTaskType()), ScheduleID: info.GetScheduleID(), Version: info.GetVersion(), } } if len(rows) > 0 { lastTaskID := rows[len(rows)-1].TaskID if lastTaskID < request.MaxReadLevel { resp.NextPageToken = serializePageToken(lastTaskID) } } return resp, nil } func (m *sqlExecutionStore) CompleteCrossClusterTask( ctx context.Context, request *p.CompleteCrossClusterTaskRequest, ) error { if _, err := m.db.DeleteFromCrossClusterTasks(ctx, &sqlplugin.CrossClusterTasksFilter{ TargetCluster: request.TargetCluster, ShardID: m.shardID, TaskID: request.TaskID, }); err != nil { return convertCommonErrors(m.db, "CompleteCrossClusterTask", "", err) } return nil } func (m *sqlExecutionStore) RangeCompleteCrossClusterTask( ctx context.Context, request *p.RangeCompleteCrossClusterTaskRequest, ) (*p.RangeCompleteCrossClusterTaskResponse, error) { result, err := m.db.RangeDeleteFromCrossClusterTasks(ctx, &sqlplugin.CrossClusterTasksFilter{ TargetCluster: request.TargetCluster, ShardID: m.shardID, MinTaskID: request.ExclusiveBeginTaskID, MaxTaskID: request.InclusiveEndTaskID, PageSize: request.PageSize, }) if err != nil { return nil, convertCommonErrors(m.db, "RangeCompleteCrossClusterTask", "", err) } rowsDeleted, err := result.RowsAffected() if err != nil { return nil, convertCommonErrors(m.db, "RangeCompleteCrossClusterTask", "", err) } return &p.RangeCompleteCrossClusterTaskResponse{TasksCompleted: int(rowsDeleted)}, nil } func (m *sqlExecutionStore) GetReplicationTasks( ctx context.Context, request *p.GetReplicationTasksRequest, ) (*p.InternalGetReplicationTasksResponse, error) { readLevel, maxReadLevelInclusive, err := getReadLevels(request) if err != nil { return nil, err } rows, err := m.db.SelectFromReplicationTasks( ctx, &sqlplugin.ReplicationTasksFilter{ ShardID: m.shardID, MinTaskID: readLevel, MaxTaskID: maxReadLevelInclusive, PageSize: request.BatchSize, }) switch err { case nil: return m.populateGetReplicationTasksResponse(rows, request.MaxReadLevel) case sql.ErrNoRows: return &p.InternalGetReplicationTasksResponse{}, nil default: return nil, convertCommonErrors(m.db, "GetReplicationTasks", "", err) } } func getReadLevels(request *p.GetReplicationTasksRequest) (readLevel int64, maxReadLevelInclusive int64, err error) { readLevel = request.ReadLevel if len(request.NextPageToken) > 0 { readLevel, err = deserializePageToken(request.NextPageToken) if err != nil { return 0, 0, err } } maxReadLevelInclusive = collection.MaxInt64(readLevel+int64(request.BatchSize), request.MaxReadLevel) return readLevel, maxReadLevelInclusive, nil } func (m *sqlExecutionStore) populateGetReplicationTasksResponse( rows []sqlplugin.ReplicationTasksRow, requestMaxReadLevel int64, ) (*p.InternalGetReplicationTasksResponse, error) { if len(rows) == 0 { return &p.InternalGetReplicationTasksResponse{}, nil } var tasks = make([]*p.InternalReplicationTaskInfo, len(rows)) for i, row := range rows { info, err := m.parser.ReplicationTaskInfoFromBlob(row.Data, row.DataEncoding) if err != nil { return nil, err } tasks[i] = &p.InternalReplicationTaskInfo{ TaskID: row.TaskID, DomainID: info.DomainID.String(), WorkflowID: info.GetWorkflowID(), RunID: info.RunID.String(), TaskType: int(info.GetTaskType()), FirstEventID: info.GetFirstEventID(), NextEventID: info.GetNextEventID(), Version: info.GetVersion(), ScheduledID: info.GetScheduledID(), BranchToken: info.GetBranchToken(), NewRunBranchToken: info.GetNewRunBranchToken(), CreationTime: info.GetCreationTimestamp(), } } var nextPageToken []byte lastTaskID := rows[len(rows)-1].TaskID if lastTaskID < requestMaxReadLevel { nextPageToken = serializePageToken(lastTaskID) } return &p.InternalGetReplicationTasksResponse{ Tasks: tasks, NextPageToken: nextPageToken, }, nil } func (m *sqlExecutionStore) CompleteReplicationTask( ctx context.Context, request *p.CompleteReplicationTaskRequest, ) error { if _, err := m.db.DeleteFromReplicationTasks(ctx, &sqlplugin.ReplicationTasksFilter{ ShardID: m.shardID, TaskID: request.TaskID, }); err != nil { return convertCommonErrors(m.db, "CompleteReplicationTask", "", err) } return nil } func (m *sqlExecutionStore) RangeCompleteReplicationTask( ctx context.Context, request *p.RangeCompleteReplicationTaskRequest, ) (*p.RangeCompleteReplicationTaskResponse, error) { result, err := m.db.RangeDeleteFromReplicationTasks(ctx, &sqlplugin.ReplicationTasksFilter{ ShardID: m.shardID, InclusiveEndTaskID: request.InclusiveEndTaskID, PageSize: request.PageSize, }) if err != nil { return nil, convertCommonErrors(m.db, "RangeCompleteReplicationTask", "", err) } rowsDeleted, err := result.RowsAffected() if err != nil { return nil, convertCommonErrors(m.db, "RangeCompleteReplicationTask", "", err) } return &p.RangeCompleteReplicationTaskResponse{TasksCompleted: int(rowsDeleted)}, nil } func (m *sqlExecutionStore) GetReplicationTasksFromDLQ( ctx context.Context, request *p.GetReplicationTasksFromDLQRequest, ) (*p.InternalGetReplicationTasksFromDLQResponse, error) { readLevel, maxReadLevelInclusive, err := getReadLevels(&request.GetReplicationTasksRequest) if err != nil { return nil, err } filter := sqlplugin.ReplicationTasksFilter{ ShardID: m.shardID, MinTaskID: readLevel, MaxTaskID: maxReadLevelInclusive, PageSize: request.BatchSize, } rows, err := m.db.SelectFromReplicationTasksDLQ(ctx, &sqlplugin.ReplicationTasksDLQFilter{ ReplicationTasksFilter: filter, SourceClusterName: request.SourceClusterName, }) switch err { case nil: return m.populateGetReplicationTasksResponse(rows, request.MaxReadLevel) case sql.ErrNoRows: return &p.InternalGetReplicationTasksResponse{}, nil default: return nil, convertCommonErrors(m.db, "GetReplicationTasksFromDLQ", "", err) } } func (m *sqlExecutionStore) GetReplicationDLQSize( ctx context.Context, request *p.GetReplicationDLQSizeRequest, ) (*p.GetReplicationDLQSizeResponse, error) { size, err := m.db.SelectFromReplicationDLQ(ctx, &sqlplugin.ReplicationTaskDLQFilter{ SourceClusterName: request.SourceClusterName, ShardID: m.shardID, }) switch err { case nil: return &p.GetReplicationDLQSizeResponse{ Size: size, }, nil case sql.ErrNoRows: return &p.GetReplicationDLQSizeResponse{ Size: 0, }, nil default: return nil, convertCommonErrors(m.db, "GetReplicationDLQSize", "", err) } } func (m *sqlExecutionStore) DeleteReplicationTaskFromDLQ( ctx context.Context, request *p.DeleteReplicationTaskFromDLQRequest, ) error { filter := sqlplugin.ReplicationTasksFilter{ ShardID: m.shardID, TaskID: request.TaskID, } if _, err := m.db.DeleteMessageFromReplicationTasksDLQ(ctx, &sqlplugin.ReplicationTasksDLQFilter{ ReplicationTasksFilter: filter, SourceClusterName: request.SourceClusterName, }); err != nil { return convertCommonErrors(m.db, "DeleteReplicationTaskFromDLQ", "", err) } return nil } func (m *sqlExecutionStore) RangeDeleteReplicationTaskFromDLQ( ctx context.Context, request *p.RangeDeleteReplicationTaskFromDLQRequest, ) (*p.RangeDeleteReplicationTaskFromDLQResponse, error) { filter := sqlplugin.ReplicationTasksFilter{ ShardID: m.shardID, TaskID: request.ExclusiveBeginTaskID, InclusiveEndTaskID: request.InclusiveEndTaskID, PageSize: request.PageSize, } result, err := m.db.RangeDeleteMessageFromReplicationTasksDLQ(ctx, &sqlplugin.ReplicationTasksDLQFilter{ ReplicationTasksFilter: filter, SourceClusterName: request.SourceClusterName, }) if err != nil { return nil, convertCommonErrors(m.db, "RangeDeleteReplicationTaskFromDLQ", "", err) } rowsDeleted, err := result.RowsAffected() if err != nil { return nil, convertCommonErrors(m.db, "RangeDeleteReplicationTaskFromDLQ", "", err) } return &p.RangeDeleteReplicationTaskFromDLQResponse{TasksCompleted: int(rowsDeleted)}, nil } func (m *sqlExecutionStore) CreateFailoverMarkerTasks( ctx context.Context, request *p.CreateFailoverMarkersRequest, ) error { dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(m.shardID, m.db.GetTotalNumDBShards()) return m.txExecuteShardLockedFn(ctx, dbShardID, "CreateFailoverMarkerTasks", request.RangeID, func(tx sqlplugin.Tx) error { replicationTasksRows := make([]sqlplugin.ReplicationTasksRow, len(request.Markers)) for i, task := range request.Markers { blob, err := m.parser.ReplicationTaskInfoToBlob(&serialization.ReplicationTaskInfo{ DomainID: serialization.MustParseUUID(task.DomainID), WorkflowID: emptyWorkflowID, RunID: serialization.MustParseUUID(emptyReplicationRunID), TaskType: int16(task.GetType()), FirstEventID: common.EmptyEventID, NextEventID: common.EmptyEventID, Version: task.GetVersion(), ScheduledID: common.EmptyEventID, EventStoreVersion: p.EventStoreVersion, NewRunEventStoreVersion: p.EventStoreVersion, BranchToken: nil, NewRunBranchToken: nil, CreationTimestamp: task.GetVisibilityTimestamp(), }) if err != nil { return err } replicationTasksRows[i].ShardID = m.shardID replicationTasksRows[i].TaskID = task.GetTaskID() replicationTasksRows[i].Data = blob.Data replicationTasksRows[i].DataEncoding = string(blob.Encoding) } result, err := tx.InsertIntoReplicationTasks(ctx, replicationTasksRows) if err != nil { return convertCommonErrors(tx, "CreateFailoverMarkerTasks", "", err) } rowsAffected, err := result.RowsAffected() if err != nil { return &types.InternalServiceError{Message: fmt.Sprintf("CreateFailoverMarkerTasks failed. Could not verify number of rows inserted. Error: %v", err)} } if int(rowsAffected) != len(replicationTasksRows) { return &types.InternalServiceError{Message: fmt.Sprintf("CreateFailoverMarkerTasks failed. Inserted %v instead of %v rows into replication_tasks.", rowsAffected, len(replicationTasksRows))} } return nil }) } type timerTaskPageToken struct { TaskID int64 Timestamp time.Time } func (t *timerTaskPageToken) serialize() ([]byte, error) { return json.Marshal(t) } func (t *timerTaskPageToken) deserialize(payload []byte) error { return json.Unmarshal(payload, t) } func (m *sqlExecutionStore) GetTimerIndexTasks( ctx context.Context, request *p.GetTimerIndexTasksRequest, ) (*p.GetTimerIndexTasksResponse, error) { pageToken := &timerTaskPageToken{TaskID: math.MinInt64, Timestamp: request.MinTimestamp} if len(request.NextPageToken) > 0 { if err := pageToken.deserialize(request.NextPageToken); err != nil { return nil, &types.InternalServiceError{ Message: fmt.Sprintf("error deserializing timerTaskPageToken: %v", err), } } } rows, err := m.db.SelectFromTimerTasks(ctx, &sqlplugin.TimerTasksFilter{ ShardID: m.shardID, MinVisibilityTimestamp: pageToken.Timestamp, TaskID: pageToken.TaskID, MaxVisibilityTimestamp: request.MaxTimestamp, PageSize: request.BatchSize + 1, }) if err != nil && err != sql.ErrNoRows { return nil, convertCommonErrors(m.db, "GetTimerIndexTasks", "", err) } resp := &p.GetTimerIndexTasksResponse{Timers: make([]*p.TimerTaskInfo, len(rows))} for i, row := range rows { info, err := m.parser.TimerTaskInfoFromBlob(row.Data, row.DataEncoding) if err != nil { return nil, err } resp.Timers[i] = &p.TimerTaskInfo{ VisibilityTimestamp: row.VisibilityTimestamp, TaskID: row.TaskID, DomainID: info.DomainID.String(), WorkflowID: info.GetWorkflowID(), RunID: info.RunID.String(), TaskType: int(info.GetTaskType()), TimeoutType: int(info.GetTimeoutType()), EventID: info.GetEventID(), ScheduleAttempt: info.GetScheduleAttempt(), Version: info.GetVersion(), } } if len(resp.Timers) > request.BatchSize { pageToken = &timerTaskPageToken{ TaskID: resp.Timers[request.BatchSize].TaskID, Timestamp: resp.Timers[request.BatchSize].VisibilityTimestamp, } resp.Timers = resp.Timers[:request.BatchSize] nextToken, err := pageToken.serialize() if err != nil { return nil, &types.InternalServiceError{ Message: fmt.Sprintf("GetTimerTasks: error serializing page token: %v", err), } } resp.NextPageToken = nextToken } return resp, nil } func (m *sqlExecutionStore) CompleteTimerTask( ctx context.Context, request *p.CompleteTimerTaskRequest, ) error { if _, err := m.db.DeleteFromTimerTasks(ctx, &sqlplugin.TimerTasksFilter{ ShardID: m.shardID, VisibilityTimestamp: request.VisibilityTimestamp, TaskID: request.TaskID, }); err != nil { return convertCommonErrors(m.db, "CompleteTimerTask", "", err) } return nil } func (m *sqlExecutionStore) RangeCompleteTimerTask( ctx context.Context, request *p.RangeCompleteTimerTaskRequest, ) (*p.RangeCompleteTimerTaskResponse, error) { result, err := m.db.RangeDeleteFromTimerTasks(ctx, &sqlplugin.TimerTasksFilter{ ShardID: m.shardID, MinVisibilityTimestamp: request.InclusiveBeginTimestamp, MaxVisibilityTimestamp: request.ExclusiveEndTimestamp, PageSize: request.PageSize, }) if err != nil { return nil, convertCommonErrors(m.db, "RangeCompleteTimerTask", "", err) } rowsDeleted, err := result.RowsAffected() if err != nil { return nil, convertCommonErrors(m.db, "RangeCompleteTimerTask", "", err) } return &p.RangeCompleteTimerTaskResponse{TasksCompleted: int(rowsDeleted)}, nil } func (m *sqlExecutionStore) PutReplicationTaskToDLQ( ctx context.Context, request *p.InternalPutReplicationTaskToDLQRequest, ) error { replicationTask := request.TaskInfo blob, err := m.parser.ReplicationTaskInfoToBlob(&serialization.ReplicationTaskInfo{ DomainID: serialization.MustParseUUID(replicationTask.DomainID), WorkflowID: replicationTask.WorkflowID, RunID: serialization.MustParseUUID(replicationTask.RunID), TaskType: int16(replicationTask.TaskType), FirstEventID: replicationTask.FirstEventID, NextEventID: replicationTask.NextEventID, Version: replicationTask.Version, ScheduledID: replicationTask.ScheduledID, EventStoreVersion: p.EventStoreVersion, NewRunEventStoreVersion: p.EventStoreVersion, BranchToken: replicationTask.BranchToken, NewRunBranchToken: replicationTask.NewRunBranchToken, CreationTimestamp: replicationTask.CreationTime, }) if err != nil { return err } row := &sqlplugin.ReplicationTaskDLQRow{ SourceClusterName: request.SourceClusterName, ShardID: m.shardID, TaskID: replicationTask.TaskID, Data: blob.Data, DataEncoding: string(blob.Encoding), } _, err = m.db.InsertIntoReplicationTasksDLQ(ctx, row) // Tasks are immutable. So it's fine if we already persisted it before. // This can happen when tasks are retried (ack and cleanup can have lag on source side). if err != nil && !m.db.IsDupEntryError(err) { return convertCommonErrors(m.db, "PutReplicationTaskToDLQ", "", err) } return nil } func (m *sqlExecutionStore) populateWorkflowMutableState( execution sqlplugin.ExecutionsRow, ) (*p.InternalWorkflowMutableState, error) { info, err := m.parser.WorkflowExecutionInfoFromBlob(execution.Data, execution.DataEncoding) if err != nil { return nil, err } state := &p.InternalWorkflowMutableState{} state.ExecutionInfo = serialization.ToInternalWorkflowExecutionInfo(info) state.ExecutionInfo.DomainID = execution.DomainID.String() state.ExecutionInfo.WorkflowID = execution.WorkflowID state.ExecutionInfo.RunID = execution.RunID.String() state.ExecutionInfo.NextEventID = execution.NextEventID // TODO: remove this after all 2DC workflows complete if info.LastWriteEventID != nil { state.ReplicationState = &p.ReplicationState{} state.ReplicationState.StartVersion = info.GetStartVersion() state.ReplicationState.LastWriteVersion = execution.LastWriteVersion state.ReplicationState.LastWriteEventID = info.GetLastWriteEventID() } if info.GetVersionHistories() != nil { state.VersionHistories = p.NewDataBlob( info.GetVersionHistories(), common.EncodingType(info.GetVersionHistoriesEncoding()), ) } if info.GetChecksum() != nil { state.ChecksumData = p.NewDataBlob( info.GetChecksum(), common.EncodingType(info.GetChecksumEncoding()), ) } return state, nil } func (m *sqlExecutionStore) populateInternalListConcreteExecutions( executions []sqlplugin.ExecutionsRow, ) ([]*p.InternalListConcreteExecutionsEntity, error) { concreteExecutions := make([]*p.InternalListConcreteExecutionsEntity, 0, len(executions)) for _, execution := range executions { mutableState, err := m.populateWorkflowMutableState(execution) if err != nil { return nil, err } concreteExecution := &p.InternalListConcreteExecutionsEntity{ ExecutionInfo: mutableState.ExecutionInfo, VersionHistories: mutableState.VersionHistories, } concreteExecutions = append(concreteExecutions, concreteExecution) } return concreteExecutions, nil }