common/persistence/execution_manager.go (932 lines of code) (raw):

// Copyright (c) 2017-2020 Uber Technologies, Inc. // Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package persistence import ( "context" "time" "github.com/uber/cadence/common" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/types" ) type ( // executionManagerImpl implements ExecutionManager based on ExecutionStore, statsComputer and PayloadSerializer executionManagerImpl struct { serializer PayloadSerializer persistence ExecutionStore statsComputer statsComputer logger log.Logger } ) var _ ExecutionManager = (*executionManagerImpl)(nil) // NewExecutionManagerImpl returns new ExecutionManager func NewExecutionManagerImpl( persistence ExecutionStore, logger log.Logger, serializer PayloadSerializer, ) ExecutionManager { return &executionManagerImpl{ serializer: serializer, persistence: persistence, statsComputer: statsComputer{}, logger: logger, } } func (m *executionManagerImpl) GetName() string { return m.persistence.GetName() } func (m *executionManagerImpl) GetShardID() int { return m.persistence.GetShardID() } // The below three APIs are related to serialization/deserialization func (m *executionManagerImpl) GetWorkflowExecution( ctx context.Context, request *GetWorkflowExecutionRequest, ) (*GetWorkflowExecutionResponse, error) { internalRequest := &InternalGetWorkflowExecutionRequest{ DomainID: request.DomainID, Execution: request.Execution, RangeID: request.RangeID, } response, err := m.persistence.GetWorkflowExecution(ctx, internalRequest) if err != nil { return nil, err } newResponse := &GetWorkflowExecutionResponse{ State: &WorkflowMutableState{ TimerInfos: response.State.TimerInfos, RequestCancelInfos: response.State.RequestCancelInfos, SignalInfos: response.State.SignalInfos, SignalRequestedIDs: response.State.SignalRequestedIDs, ReplicationState: response.State.ReplicationState, // TODO: remove this after all 2DC workflows complete Checksum: response.State.Checksum, }, } newResponse.State.ActivityInfos, err = m.DeserializeActivityInfos(response.State.ActivityInfos) if err != nil { return nil, err } newResponse.State.ChildExecutionInfos, err = m.DeserializeChildExecutionInfos(response.State.ChildExecutionInfos) if err != nil { return nil, err } newResponse.State.BufferedEvents, err = m.DeserializeBufferedEvents(response.State.BufferedEvents) if err != nil { return nil, err } newResponse.State.ExecutionInfo, newResponse.State.ExecutionStats, err = m.DeserializeExecutionInfo(response.State.ExecutionInfo) if err != nil { return nil, err } versionHistories, err := m.DeserializeVersionHistories(response.State.VersionHistories) if err != nil { return nil, err } newResponse.State.VersionHistories = versionHistories newResponse.MutableStateStats = m.statsComputer.computeMutableStateStats(response) if len(newResponse.State.Checksum.Value) == 0 { newResponse.State.Checksum, err = m.serializer.DeserializeChecksum(response.State.ChecksumData) if err != nil { return nil, err } } return newResponse, nil } func (m *executionManagerImpl) DeserializeExecutionInfo( info *InternalWorkflowExecutionInfo, ) (*WorkflowExecutionInfo, *ExecutionStats, error) { completionEvent, err := m.serializer.DeserializeEvent(info.CompletionEvent) if err != nil { return nil, nil, err } autoResetPoints, err := m.serializer.DeserializeResetPoints(info.AutoResetPoints) if err != nil { return nil, nil, err } newInfo := &WorkflowExecutionInfo{ CompletionEvent: completionEvent, DomainID: info.DomainID, WorkflowID: info.WorkflowID, RunID: info.RunID, FirstExecutionRunID: info.FirstExecutionRunID, ParentDomainID: info.ParentDomainID, ParentWorkflowID: info.ParentWorkflowID, ParentRunID: info.ParentRunID, InitiatedID: info.InitiatedID, CompletionEventBatchID: info.CompletionEventBatchID, TaskList: info.TaskList, IsCron: len(info.CronSchedule) > 0, WorkflowTypeName: info.WorkflowTypeName, WorkflowTimeout: int32(info.WorkflowTimeout.Seconds()), DecisionStartToCloseTimeout: int32(info.DecisionStartToCloseTimeout.Seconds()), ExecutionContext: info.ExecutionContext, State: info.State, CloseStatus: info.CloseStatus, LastFirstEventID: info.LastFirstEventID, LastEventTaskID: info.LastEventTaskID, NextEventID: info.NextEventID, LastProcessedEvent: info.LastProcessedEvent, StartTimestamp: info.StartTimestamp, LastUpdatedTimestamp: info.LastUpdatedTimestamp, CreateRequestID: info.CreateRequestID, SignalCount: info.SignalCount, DecisionVersion: info.DecisionVersion, DecisionScheduleID: info.DecisionScheduleID, DecisionStartedID: info.DecisionStartedID, DecisionRequestID: info.DecisionRequestID, DecisionTimeout: int32(info.DecisionTimeout.Seconds()), DecisionAttempt: info.DecisionAttempt, DecisionStartedTimestamp: info.DecisionStartedTimestamp.UnixNano(), DecisionScheduledTimestamp: info.DecisionScheduledTimestamp.UnixNano(), DecisionOriginalScheduledTimestamp: info.DecisionOriginalScheduledTimestamp.UnixNano(), CancelRequested: info.CancelRequested, CancelRequestID: info.CancelRequestID, StickyTaskList: info.StickyTaskList, StickyScheduleToStartTimeout: int32(info.StickyScheduleToStartTimeout.Seconds()), ClientLibraryVersion: info.ClientLibraryVersion, ClientFeatureVersion: info.ClientFeatureVersion, ClientImpl: info.ClientImpl, Attempt: info.Attempt, HasRetryPolicy: info.HasRetryPolicy, InitialInterval: int32(info.InitialInterval.Seconds()), BackoffCoefficient: info.BackoffCoefficient, MaximumInterval: int32(info.MaximumInterval.Seconds()), ExpirationTime: info.ExpirationTime, MaximumAttempts: info.MaximumAttempts, NonRetriableErrors: info.NonRetriableErrors, BranchToken: info.BranchToken, CronSchedule: info.CronSchedule, ExpirationSeconds: int32(info.ExpirationInterval.Seconds()), AutoResetPoints: autoResetPoints, SearchAttributes: info.SearchAttributes, Memo: info.Memo, PartitionConfig: info.PartitionConfig, } newStats := &ExecutionStats{ HistorySize: info.HistorySize, } return newInfo, newStats, nil } func (m *executionManagerImpl) DeserializeBufferedEvents( blobs []*DataBlob, ) ([]*types.HistoryEvent, error) { events := make([]*types.HistoryEvent, 0) for _, b := range blobs { history, err := m.serializer.DeserializeBatchEvents(b) if err != nil { return nil, err } events = append(events, history...) } return events, nil } func (m *executionManagerImpl) DeserializeChildExecutionInfos( infos map[int64]*InternalChildExecutionInfo, ) (map[int64]*ChildExecutionInfo, error) { newInfos := make(map[int64]*ChildExecutionInfo) for k, v := range infos { initiatedEvent, err := m.serializer.DeserializeEvent(v.InitiatedEvent) if err != nil { return nil, err } startedEvent, err := m.serializer.DeserializeEvent(v.StartedEvent) if err != nil { return nil, err } c := &ChildExecutionInfo{ InitiatedEvent: initiatedEvent, StartedEvent: startedEvent, Version: v.Version, InitiatedID: v.InitiatedID, InitiatedEventBatchID: v.InitiatedEventBatchID, StartedID: v.StartedID, StartedWorkflowID: v.StartedWorkflowID, StartedRunID: v.StartedRunID, CreateRequestID: v.CreateRequestID, DomainID: v.DomainID, DomainNameDEPRECATED: v.DomainNameDEPRECATED, WorkflowTypeName: v.WorkflowTypeName, ParentClosePolicy: v.ParentClosePolicy, } // Needed for backward compatibility reason. // ChildWorkflowExecutionStartedEvent was only used by transfer queue processing of StartChildWorkflow. // Updated the code to instead directly read WorkflowId and RunId from mutable state // Existing mutable state won't have those values set so instead use started event to set StartedWorkflowID and // StartedRunID on the mutable state before passing it to application if startedEvent != nil && startedEvent.ChildWorkflowExecutionStartedEventAttributes != nil && startedEvent.ChildWorkflowExecutionStartedEventAttributes.WorkflowExecution != nil { startedExecution := startedEvent.ChildWorkflowExecutionStartedEventAttributes.WorkflowExecution c.StartedWorkflowID = startedExecution.GetWorkflowID() c.StartedRunID = startedExecution.GetRunID() } newInfos[k] = c } return newInfos, nil } func (m *executionManagerImpl) DeserializeActivityInfos( infos map[int64]*InternalActivityInfo, ) (map[int64]*ActivityInfo, error) { newInfos := make(map[int64]*ActivityInfo) for k, v := range infos { scheduledEvent, err := m.serializer.DeserializeEvent(v.ScheduledEvent) if err != nil { return nil, err } startedEvent, err := m.serializer.DeserializeEvent(v.StartedEvent) if err != nil { return nil, err } a := &ActivityInfo{ ScheduledEvent: scheduledEvent, StartedEvent: startedEvent, Version: v.Version, ScheduleID: v.ScheduleID, ScheduledEventBatchID: v.ScheduledEventBatchID, ScheduledTime: v.ScheduledTime, StartedID: v.StartedID, StartedTime: v.StartedTime, ActivityID: v.ActivityID, RequestID: v.RequestID, Details: v.Details, ScheduleToStartTimeout: int32(v.ScheduleToStartTimeout.Seconds()), ScheduleToCloseTimeout: int32(v.ScheduleToCloseTimeout.Seconds()), StartToCloseTimeout: int32(v.StartToCloseTimeout.Seconds()), HeartbeatTimeout: int32(v.HeartbeatTimeout.Seconds()), CancelRequested: v.CancelRequested, CancelRequestID: v.CancelRequestID, LastHeartBeatUpdatedTime: v.LastHeartBeatUpdatedTime, TimerTaskStatus: v.TimerTaskStatus, Attempt: v.Attempt, DomainID: v.DomainID, StartedIdentity: v.StartedIdentity, TaskList: v.TaskList, HasRetryPolicy: v.HasRetryPolicy, InitialInterval: int32(v.InitialInterval.Seconds()), BackoffCoefficient: v.BackoffCoefficient, MaximumInterval: int32(v.MaximumInterval.Seconds()), ExpirationTime: v.ExpirationTime, MaximumAttempts: v.MaximumAttempts, NonRetriableErrors: v.NonRetriableErrors, LastFailureReason: v.LastFailureReason, LastWorkerIdentity: v.LastWorkerIdentity, LastFailureDetails: v.LastFailureDetails, LastHeartbeatTimeoutVisibilityInSeconds: v.LastHeartbeatTimeoutVisibilityInSeconds, } newInfos[k] = a } return newInfos, nil } func (m *executionManagerImpl) UpdateWorkflowExecution( ctx context.Context, request *UpdateWorkflowExecutionRequest, ) (*UpdateWorkflowExecutionResponse, error) { serializedWorkflowMutation, err := m.SerializeWorkflowMutation(&request.UpdateWorkflowMutation, request.Encoding) if err != nil { return nil, err } var serializedNewWorkflowSnapshot *InternalWorkflowSnapshot if request.NewWorkflowSnapshot != nil { serializedNewWorkflowSnapshot, err = m.SerializeWorkflowSnapshot(request.NewWorkflowSnapshot, request.Encoding) if err != nil { return nil, err } } newRequest := &InternalUpdateWorkflowExecutionRequest{ RangeID: request.RangeID, Mode: request.Mode, UpdateWorkflowMutation: *serializedWorkflowMutation, NewWorkflowSnapshot: serializedNewWorkflowSnapshot, } msuss := m.statsComputer.computeMutableStateUpdateStats(newRequest) err = m.persistence.UpdateWorkflowExecution(ctx, newRequest) if err != nil { return nil, err } return &UpdateWorkflowExecutionResponse{MutableStateUpdateSessionStats: msuss}, nil } func (m *executionManagerImpl) SerializeUpsertChildExecutionInfos( infos []*ChildExecutionInfo, encoding common.EncodingType, ) ([]*InternalChildExecutionInfo, error) { newInfos := make([]*InternalChildExecutionInfo, 0) for _, v := range infos { initiatedEvent, err := m.serializer.SerializeEvent(v.InitiatedEvent, encoding) if err != nil { return nil, err } startedEvent, err := m.serializer.SerializeEvent(v.StartedEvent, encoding) if err != nil { return nil, err } i := &InternalChildExecutionInfo{ InitiatedEvent: initiatedEvent, StartedEvent: startedEvent, Version: v.Version, InitiatedID: v.InitiatedID, InitiatedEventBatchID: v.InitiatedEventBatchID, CreateRequestID: v.CreateRequestID, StartedID: v.StartedID, StartedWorkflowID: v.StartedWorkflowID, StartedRunID: v.StartedRunID, DomainID: v.DomainID, DomainNameDEPRECATED: v.DomainNameDEPRECATED, WorkflowTypeName: v.WorkflowTypeName, ParentClosePolicy: v.ParentClosePolicy, } newInfos = append(newInfos, i) } return newInfos, nil } func (m *executionManagerImpl) SerializeUpsertActivityInfos( infos []*ActivityInfo, encoding common.EncodingType, ) ([]*InternalActivityInfo, error) { newInfos := make([]*InternalActivityInfo, 0) for _, v := range infos { scheduledEvent, err := m.serializer.SerializeEvent(v.ScheduledEvent, encoding) if err != nil { return nil, err } startedEvent, err := m.serializer.SerializeEvent(v.StartedEvent, encoding) if err != nil { return nil, err } i := &InternalActivityInfo{ Version: v.Version, ScheduleID: v.ScheduleID, ScheduledEventBatchID: v.ScheduledEventBatchID, ScheduledEvent: scheduledEvent, ScheduledTime: v.ScheduledTime, StartedID: v.StartedID, StartedEvent: startedEvent, StartedTime: v.StartedTime, ActivityID: v.ActivityID, RequestID: v.RequestID, Details: v.Details, ScheduleToStartTimeout: common.SecondsToDuration(int64(v.ScheduleToStartTimeout)), ScheduleToCloseTimeout: common.SecondsToDuration(int64(v.ScheduleToCloseTimeout)), StartToCloseTimeout: common.SecondsToDuration(int64(v.StartToCloseTimeout)), HeartbeatTimeout: common.SecondsToDuration(int64(v.HeartbeatTimeout)), CancelRequested: v.CancelRequested, CancelRequestID: v.CancelRequestID, LastHeartBeatUpdatedTime: v.LastHeartBeatUpdatedTime, TimerTaskStatus: v.TimerTaskStatus, Attempt: v.Attempt, DomainID: v.DomainID, StartedIdentity: v.StartedIdentity, TaskList: v.TaskList, HasRetryPolicy: v.HasRetryPolicy, InitialInterval: common.SecondsToDuration(int64(v.InitialInterval)), BackoffCoefficient: v.BackoffCoefficient, MaximumInterval: common.SecondsToDuration(int64(v.MaximumInterval)), ExpirationTime: v.ExpirationTime, MaximumAttempts: v.MaximumAttempts, NonRetriableErrors: v.NonRetriableErrors, LastFailureReason: v.LastFailureReason, LastWorkerIdentity: v.LastWorkerIdentity, LastFailureDetails: v.LastFailureDetails, LastHeartbeatTimeoutVisibilityInSeconds: v.LastHeartbeatTimeoutVisibilityInSeconds, } newInfos = append(newInfos, i) } return newInfos, nil } func (m *executionManagerImpl) SerializeExecutionInfo( info *WorkflowExecutionInfo, stats *ExecutionStats, encoding common.EncodingType, ) (*InternalWorkflowExecutionInfo, error) { if info == nil { return &InternalWorkflowExecutionInfo{}, nil } completionEvent, err := m.serializer.SerializeEvent(info.CompletionEvent, encoding) if err != nil { return nil, err } resetPoints, err := m.serializer.SerializeResetPoints(info.AutoResetPoints, encoding) if err != nil { return nil, err } return &InternalWorkflowExecutionInfo{ DomainID: info.DomainID, WorkflowID: info.WorkflowID, RunID: info.RunID, FirstExecutionRunID: info.FirstExecutionRunID, ParentDomainID: info.ParentDomainID, ParentWorkflowID: info.ParentWorkflowID, ParentRunID: info.ParentRunID, InitiatedID: info.InitiatedID, CompletionEventBatchID: info.CompletionEventBatchID, CompletionEvent: completionEvent, TaskList: info.TaskList, WorkflowTypeName: info.WorkflowTypeName, WorkflowTimeout: common.SecondsToDuration(int64(info.WorkflowTimeout)), DecisionStartToCloseTimeout: common.SecondsToDuration(int64(info.DecisionStartToCloseTimeout)), ExecutionContext: info.ExecutionContext, State: info.State, CloseStatus: info.CloseStatus, LastFirstEventID: info.LastFirstEventID, LastEventTaskID: info.LastEventTaskID, NextEventID: info.NextEventID, LastProcessedEvent: info.LastProcessedEvent, StartTimestamp: info.StartTimestamp, LastUpdatedTimestamp: info.LastUpdatedTimestamp, CreateRequestID: info.CreateRequestID, SignalCount: info.SignalCount, DecisionVersion: info.DecisionVersion, DecisionScheduleID: info.DecisionScheduleID, DecisionStartedID: info.DecisionStartedID, DecisionRequestID: info.DecisionRequestID, DecisionTimeout: common.SecondsToDuration(int64(info.DecisionTimeout)), DecisionAttempt: info.DecisionAttempt, DecisionStartedTimestamp: time.Unix(0, info.DecisionStartedTimestamp).UTC(), DecisionScheduledTimestamp: time.Unix(0, info.DecisionScheduledTimestamp).UTC(), DecisionOriginalScheduledTimestamp: time.Unix(0, info.DecisionOriginalScheduledTimestamp).UTC(), CancelRequested: info.CancelRequested, CancelRequestID: info.CancelRequestID, StickyTaskList: info.StickyTaskList, StickyScheduleToStartTimeout: common.SecondsToDuration(int64(info.StickyScheduleToStartTimeout)), ClientLibraryVersion: info.ClientLibraryVersion, ClientFeatureVersion: info.ClientFeatureVersion, ClientImpl: info.ClientImpl, AutoResetPoints: resetPoints, Attempt: info.Attempt, HasRetryPolicy: info.HasRetryPolicy, InitialInterval: common.SecondsToDuration(int64(info.InitialInterval)), BackoffCoefficient: info.BackoffCoefficient, MaximumInterval: common.SecondsToDuration(int64(info.MaximumInterval)), ExpirationTime: info.ExpirationTime, MaximumAttempts: info.MaximumAttempts, NonRetriableErrors: info.NonRetriableErrors, BranchToken: info.BranchToken, CronSchedule: info.CronSchedule, ExpirationInterval: common.SecondsToDuration(int64(info.ExpirationSeconds)), Memo: info.Memo, SearchAttributes: info.SearchAttributes, PartitionConfig: info.PartitionConfig, // attributes which are not related to mutable state HistorySize: stats.HistorySize, }, nil } func (m *executionManagerImpl) ConflictResolveWorkflowExecution( ctx context.Context, request *ConflictResolveWorkflowExecutionRequest, ) (*ConflictResolveWorkflowExecutionResponse, error) { serializedResetWorkflowSnapshot, err := m.SerializeWorkflowSnapshot(&request.ResetWorkflowSnapshot, request.Encoding) if err != nil { return nil, err } var serializedCurrentWorkflowMutation *InternalWorkflowMutation if request.CurrentWorkflowMutation != nil { serializedCurrentWorkflowMutation, err = m.SerializeWorkflowMutation(request.CurrentWorkflowMutation, request.Encoding) if err != nil { return nil, err } } var serializedNewWorkflowMutation *InternalWorkflowSnapshot if request.NewWorkflowSnapshot != nil { serializedNewWorkflowMutation, err = m.SerializeWorkflowSnapshot(request.NewWorkflowSnapshot, request.Encoding) if err != nil { return nil, err } } newRequest := &InternalConflictResolveWorkflowExecutionRequest{ RangeID: request.RangeID, Mode: request.Mode, ResetWorkflowSnapshot: *serializedResetWorkflowSnapshot, NewWorkflowSnapshot: serializedNewWorkflowMutation, CurrentWorkflowMutation: serializedCurrentWorkflowMutation, } msuss := m.statsComputer.computeMutableStateConflictResolveStats(newRequest) err = m.persistence.ConflictResolveWorkflowExecution(ctx, newRequest) if err != nil { return nil, err } return &ConflictResolveWorkflowExecutionResponse{MutableStateUpdateSessionStats: msuss}, nil } func (m *executionManagerImpl) CreateWorkflowExecution( ctx context.Context, request *CreateWorkflowExecutionRequest, ) (*CreateWorkflowExecutionResponse, error) { encoding := common.EncodingTypeThriftRW serializedNewWorkflowSnapshot, err := m.SerializeWorkflowSnapshot(&request.NewWorkflowSnapshot, encoding) if err != nil { return nil, err } newRequest := &InternalCreateWorkflowExecutionRequest{ RangeID: request.RangeID, Mode: request.Mode, PreviousRunID: request.PreviousRunID, PreviousLastWriteVersion: request.PreviousLastWriteVersion, NewWorkflowSnapshot: *serializedNewWorkflowSnapshot, } msuss := m.statsComputer.computeMutableStateCreateStats(newRequest) _, err = m.persistence.CreateWorkflowExecution(ctx, newRequest) if err != nil { return nil, err } return &CreateWorkflowExecutionResponse{MutableStateUpdateSessionStats: msuss}, nil } func (m *executionManagerImpl) SerializeWorkflowMutation( input *WorkflowMutation, encoding common.EncodingType, ) (*InternalWorkflowMutation, error) { serializedExecutionInfo, err := m.SerializeExecutionInfo( input.ExecutionInfo, input.ExecutionStats, encoding, ) if err != nil { return nil, err } serializedVersionHistories, err := m.SerializeVersionHistories(input.VersionHistories, encoding) if err != nil { return nil, err } serializedUpsertActivityInfos, err := m.SerializeUpsertActivityInfos(input.UpsertActivityInfos, encoding) if err != nil { return nil, err } serializedUpsertChildExecutionInfos, err := m.SerializeUpsertChildExecutionInfos(input.UpsertChildExecutionInfos, encoding) if err != nil { return nil, err } var serializedNewBufferedEvents *DataBlob if input.NewBufferedEvents != nil { serializedNewBufferedEvents, err = m.serializer.SerializeBatchEvents(input.NewBufferedEvents, encoding) if err != nil { return nil, err } } startVersion, err := getStartVersion(input.VersionHistories) if err != nil { return nil, err } lastWriteVersion, err := getLastWriteVersion(input.VersionHistories) if err != nil { return nil, err } checksumData, err := m.serializer.SerializeChecksum(input.Checksum, common.EncodingTypeJSON) if err != nil { return nil, err } return &InternalWorkflowMutation{ ExecutionInfo: serializedExecutionInfo, VersionHistories: serializedVersionHistories, StartVersion: startVersion, LastWriteVersion: lastWriteVersion, UpsertActivityInfos: serializedUpsertActivityInfos, DeleteActivityInfos: input.DeleteActivityInfos, UpsertTimerInfos: input.UpsertTimerInfos, DeleteTimerInfos: input.DeleteTimerInfos, UpsertChildExecutionInfos: serializedUpsertChildExecutionInfos, DeleteChildExecutionInfos: input.DeleteChildExecutionInfos, UpsertRequestCancelInfos: input.UpsertRequestCancelInfos, DeleteRequestCancelInfos: input.DeleteRequestCancelInfos, UpsertSignalInfos: input.UpsertSignalInfos, DeleteSignalInfos: input.DeleteSignalInfos, UpsertSignalRequestedIDs: input.UpsertSignalRequestedIDs, DeleteSignalRequestedIDs: input.DeleteSignalRequestedIDs, NewBufferedEvents: serializedNewBufferedEvents, ClearBufferedEvents: input.ClearBufferedEvents, TransferTasks: input.TransferTasks, CrossClusterTasks: input.CrossClusterTasks, ReplicationTasks: input.ReplicationTasks, TimerTasks: input.TimerTasks, Condition: input.Condition, Checksum: input.Checksum, ChecksumData: checksumData, }, nil } func (m *executionManagerImpl) SerializeWorkflowSnapshot( input *WorkflowSnapshot, encoding common.EncodingType, ) (*InternalWorkflowSnapshot, error) { serializedExecutionInfo, err := m.SerializeExecutionInfo( input.ExecutionInfo, input.ExecutionStats, encoding, ) if err != nil { return nil, err } serializedVersionHistories, err := m.SerializeVersionHistories(input.VersionHistories, encoding) if err != nil { return nil, err } serializedActivityInfos, err := m.SerializeUpsertActivityInfos(input.ActivityInfos, encoding) if err != nil { return nil, err } serializedChildExecutionInfos, err := m.SerializeUpsertChildExecutionInfos(input.ChildExecutionInfos, encoding) if err != nil { return nil, err } startVersion, err := getStartVersion(input.VersionHistories) if err != nil { return nil, err } lastWriteVersion, err := getLastWriteVersion(input.VersionHistories) if err != nil { return nil, err } checksumData, err := m.serializer.SerializeChecksum(input.Checksum, common.EncodingTypeJSON) if err != nil { return nil, err } return &InternalWorkflowSnapshot{ ExecutionInfo: serializedExecutionInfo, VersionHistories: serializedVersionHistories, StartVersion: startVersion, LastWriteVersion: lastWriteVersion, ActivityInfos: serializedActivityInfos, TimerInfos: input.TimerInfos, ChildExecutionInfos: serializedChildExecutionInfos, RequestCancelInfos: input.RequestCancelInfos, SignalInfos: input.SignalInfos, SignalRequestedIDs: input.SignalRequestedIDs, TransferTasks: input.TransferTasks, CrossClusterTasks: input.CrossClusterTasks, ReplicationTasks: input.ReplicationTasks, TimerTasks: input.TimerTasks, Condition: input.Condition, Checksum: input.Checksum, ChecksumData: checksumData, }, nil } func (m *executionManagerImpl) SerializeVersionHistories( versionHistories *VersionHistories, encoding common.EncodingType, ) (*DataBlob, error) { if versionHistories == nil { return nil, nil } return m.serializer.SerializeVersionHistories(versionHistories.ToInternalType(), encoding) } func (m *executionManagerImpl) DeserializeVersionHistories( blob *DataBlob, ) (*VersionHistories, error) { if blob == nil { return nil, nil } versionHistories, err := m.serializer.DeserializeVersionHistories(blob) if err != nil { return nil, err } return NewVersionHistoriesFromInternalType(versionHistories), nil } func (m *executionManagerImpl) DeleteWorkflowExecution( ctx context.Context, request *DeleteWorkflowExecutionRequest, ) error { return m.persistence.DeleteWorkflowExecution(ctx, request) } func (m *executionManagerImpl) DeleteCurrentWorkflowExecution( ctx context.Context, request *DeleteCurrentWorkflowExecutionRequest, ) error { return m.persistence.DeleteCurrentWorkflowExecution(ctx, request) } func (m *executionManagerImpl) GetCurrentExecution( ctx context.Context, request *GetCurrentExecutionRequest, ) (*GetCurrentExecutionResponse, error) { return m.persistence.GetCurrentExecution(ctx, request) } func (m *executionManagerImpl) ListCurrentExecutions( ctx context.Context, request *ListCurrentExecutionsRequest, ) (*ListCurrentExecutionsResponse, error) { return m.persistence.ListCurrentExecutions(ctx, request) } func (m *executionManagerImpl) IsWorkflowExecutionExists( ctx context.Context, request *IsWorkflowExecutionExistsRequest, ) (*IsWorkflowExecutionExistsResponse, error) { return m.persistence.IsWorkflowExecutionExists(ctx, request) } func (m *executionManagerImpl) ListConcreteExecutions( ctx context.Context, request *ListConcreteExecutionsRequest, ) (*ListConcreteExecutionsResponse, error) { response, err := m.persistence.ListConcreteExecutions(ctx, request) if err != nil { return nil, err } newResponse := &ListConcreteExecutionsResponse{ Executions: make([]*ListConcreteExecutionsEntity, len(response.Executions)), PageToken: response.NextPageToken, } for i, e := range response.Executions { info, _, err := m.DeserializeExecutionInfo(e.ExecutionInfo) if err != nil { return nil, err } vh, err := m.DeserializeVersionHistories(e.VersionHistories) if err != nil { return nil, err } newResponse.Executions[i] = &ListConcreteExecutionsEntity{ ExecutionInfo: info, VersionHistories: vh, } } return newResponse, nil } // Transfer task related methods func (m *executionManagerImpl) GetTransferTasks( ctx context.Context, request *GetTransferTasksRequest, ) (*GetTransferTasksResponse, error) { return m.persistence.GetTransferTasks(ctx, request) } func (m *executionManagerImpl) CompleteTransferTask( ctx context.Context, request *CompleteTransferTaskRequest, ) error { return m.persistence.CompleteTransferTask(ctx, request) } func (m *executionManagerImpl) RangeCompleteTransferTask( ctx context.Context, request *RangeCompleteTransferTaskRequest, ) (*RangeCompleteTransferTaskResponse, error) { return m.persistence.RangeCompleteTransferTask(ctx, request) } // Cross-cluster task related methods func (m *executionManagerImpl) GetCrossClusterTasks( ctx context.Context, request *GetCrossClusterTasksRequest, ) (*GetCrossClusterTasksResponse, error) { return m.persistence.GetCrossClusterTasks(ctx, request) } func (m *executionManagerImpl) CompleteCrossClusterTask( ctx context.Context, request *CompleteCrossClusterTaskRequest, ) error { return m.persistence.CompleteCrossClusterTask(ctx, request) } func (m *executionManagerImpl) RangeCompleteCrossClusterTask( ctx context.Context, request *RangeCompleteCrossClusterTaskRequest, ) (*RangeCompleteCrossClusterTaskResponse, error) { return m.persistence.RangeCompleteCrossClusterTask(ctx, request) } // Replication task related methods func (m *executionManagerImpl) GetReplicationTasks( ctx context.Context, request *GetReplicationTasksRequest, ) (*GetReplicationTasksResponse, error) { resp, err := m.persistence.GetReplicationTasks(ctx, request) if err != nil { return nil, err } return &GetReplicationTasksResponse{ Tasks: m.fromInternalReplicationTaskInfos(resp.Tasks), NextPageToken: resp.NextPageToken, }, nil } func (m *executionManagerImpl) CompleteReplicationTask( ctx context.Context, request *CompleteReplicationTaskRequest, ) error { return m.persistence.CompleteReplicationTask(ctx, request) } func (m *executionManagerImpl) RangeCompleteReplicationTask( ctx context.Context, request *RangeCompleteReplicationTaskRequest, ) (*RangeCompleteReplicationTaskResponse, error) { return m.persistence.RangeCompleteReplicationTask(ctx, request) } func (m *executionManagerImpl) PutReplicationTaskToDLQ( ctx context.Context, request *PutReplicationTaskToDLQRequest, ) error { internalRequest := &InternalPutReplicationTaskToDLQRequest{ SourceClusterName: request.SourceClusterName, TaskInfo: m.toInternalReplicationTaskInfo(request.TaskInfo), } return m.persistence.PutReplicationTaskToDLQ(ctx, internalRequest) } func (m *executionManagerImpl) GetReplicationTasksFromDLQ( ctx context.Context, request *GetReplicationTasksFromDLQRequest, ) (*GetReplicationTasksFromDLQResponse, error) { resp, err := m.persistence.GetReplicationTasksFromDLQ(ctx, request) if err != nil { return nil, err } return &GetReplicationTasksFromDLQResponse{ Tasks: m.fromInternalReplicationTaskInfos(resp.Tasks), NextPageToken: resp.NextPageToken, }, nil } func (m *executionManagerImpl) GetReplicationDLQSize( ctx context.Context, request *GetReplicationDLQSizeRequest, ) (*GetReplicationDLQSizeResponse, error) { return m.persistence.GetReplicationDLQSize(ctx, request) } func (m *executionManagerImpl) DeleteReplicationTaskFromDLQ( ctx context.Context, request *DeleteReplicationTaskFromDLQRequest, ) error { return m.persistence.DeleteReplicationTaskFromDLQ(ctx, request) } func (m *executionManagerImpl) RangeDeleteReplicationTaskFromDLQ( ctx context.Context, request *RangeDeleteReplicationTaskFromDLQRequest, ) (*RangeDeleteReplicationTaskFromDLQResponse, error) { return m.persistence.RangeDeleteReplicationTaskFromDLQ(ctx, request) } func (m *executionManagerImpl) CreateFailoverMarkerTasks( ctx context.Context, request *CreateFailoverMarkersRequest, ) error { return m.persistence.CreateFailoverMarkerTasks(ctx, request) } // Timer related methods. func (m *executionManagerImpl) GetTimerIndexTasks( ctx context.Context, request *GetTimerIndexTasksRequest, ) (*GetTimerIndexTasksResponse, error) { return m.persistence.GetTimerIndexTasks(ctx, request) } func (m *executionManagerImpl) CompleteTimerTask( ctx context.Context, request *CompleteTimerTaskRequest, ) error { return m.persistence.CompleteTimerTask(ctx, request) } func (m *executionManagerImpl) RangeCompleteTimerTask( ctx context.Context, request *RangeCompleteTimerTaskRequest, ) (*RangeCompleteTimerTaskResponse, error) { return m.persistence.RangeCompleteTimerTask(ctx, request) } func (m *executionManagerImpl) Close() { m.persistence.Close() } func (m *executionManagerImpl) fromInternalReplicationTaskInfos(internalInfos []*InternalReplicationTaskInfo) []*ReplicationTaskInfo { if internalInfos == nil { return nil } infos := make([]*ReplicationTaskInfo, len(internalInfos)) for i := 0; i < len(internalInfos); i++ { infos[i] = m.fromInternalReplicationTaskInfo(internalInfos[i]) } return infos } func (m *executionManagerImpl) fromInternalReplicationTaskInfo(internalInfo *InternalReplicationTaskInfo) *ReplicationTaskInfo { if internalInfo == nil { return nil } return &ReplicationTaskInfo{ DomainID: internalInfo.DomainID, WorkflowID: internalInfo.WorkflowID, RunID: internalInfo.RunID, TaskID: internalInfo.TaskID, TaskType: internalInfo.TaskType, FirstEventID: internalInfo.FirstEventID, NextEventID: internalInfo.NextEventID, Version: internalInfo.Version, ScheduledID: internalInfo.ScheduledID, BranchToken: internalInfo.BranchToken, NewRunBranchToken: internalInfo.NewRunBranchToken, CreationTime: internalInfo.CreationTime.UnixNano(), } } func (m *executionManagerImpl) toInternalReplicationTaskInfo(info *ReplicationTaskInfo) *InternalReplicationTaskInfo { if info == nil { return nil } return &InternalReplicationTaskInfo{ DomainID: info.DomainID, WorkflowID: info.WorkflowID, RunID: info.RunID, TaskID: info.TaskID, TaskType: info.TaskType, FirstEventID: info.FirstEventID, NextEventID: info.NextEventID, Version: info.Version, ScheduledID: info.ScheduledID, BranchToken: info.BranchToken, NewRunBranchToken: info.NewRunBranchToken, CreationTime: time.Unix(0, info.CreationTime), } } func getStartVersion( versionHistories *VersionHistories, ) (int64, error) { if versionHistories == nil { return common.EmptyVersion, nil } versionHistory, err := versionHistories.GetCurrentVersionHistory() if err != nil { return 0, err } versionHistoryItem, err := versionHistory.GetFirstItem() if err != nil { return 0, err } return versionHistoryItem.Version, nil } func getLastWriteVersion( versionHistories *VersionHistories, ) (int64, error) { if versionHistories == nil { return common.EmptyVersion, nil } versionHistory, err := versionHistories.GetCurrentVersionHistory() if err != nil { return 0, err } versionHistoryItem, err := versionHistory.GetLastItem() if err != nil { return 0, err } return versionHistoryItem.Version, nil }