service/history/execution/context.go (1,207 lines of code) (raw):
// Copyright (c) 2020 Uber Technologies, Inc.
// Portions of the Software are attributed to Copyright (c) 2020 Temporal Technologies Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination context_mock.go -self_package github.com/uber/cadence/service/history/execution
package execution
import (
"context"
"errors"
"fmt"
"strings"
"testing"
"time"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/backoff"
"github.com/uber/cadence/common/locks"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/types"
hcommon "github.com/uber/cadence/service/history/common"
"github.com/uber/cadence/service/history/engine"
"github.com/uber/cadence/service/history/events"
"github.com/uber/cadence/service/history/shard"
)
const (
defaultRemoteCallTimeout = 30 * time.Second
checksumErrorRetryCount = 3
)
type conflictError struct {
Cause error
}
func (e *conflictError) Error() string {
return fmt.Sprintf("conditional update failed: %v", e.Cause)
}
func (e *conflictError) Unwrap() error {
return e.Cause
}
// NewConflictError is only public because it used in workflow/util_test.go
// TODO: refactor those tests
func NewConflictError(_ *testing.T, cause error) error {
return &conflictError{cause}
}
// IsConflictError checks whether a conflict has occurred while updating a workflow execution
func IsConflictError(err error) bool {
var e *conflictError
return errors.As(err, &e)
}
type (
// Context is the processing context for all operations on workflow execution
Context interface {
GetDomainName() string
GetDomainID() string
GetExecution() *types.WorkflowExecution
GetWorkflowExecution() MutableState
SetWorkflowExecution(mutableState MutableState)
LoadWorkflowExecution(ctx context.Context) (MutableState, error)
LoadWorkflowExecutionWithTaskVersion(ctx context.Context, incomingVersion int64) (MutableState, error)
LoadExecutionStats(ctx context.Context) (*persistence.ExecutionStats, error)
Clear()
Lock(ctx context.Context) error
Unlock()
GetHistorySize() int64
SetHistorySize(size int64)
ReapplyEvents(
eventBatches []*persistence.WorkflowEvents,
) error
PersistStartWorkflowBatchEvents(
ctx context.Context,
workflowEvents *persistence.WorkflowEvents,
) (events.PersistedBlob, error)
PersistNonStartWorkflowBatchEvents(
ctx context.Context,
workflowEvents *persistence.WorkflowEvents,
) (events.PersistedBlob, error)
CreateWorkflowExecution(
ctx context.Context,
newWorkflow *persistence.WorkflowSnapshot,
persistedHistory events.PersistedBlob,
createMode persistence.CreateWorkflowMode,
prevRunID string,
prevLastWriteVersion int64,
) error
ConflictResolveWorkflowExecution(
ctx context.Context,
now time.Time,
conflictResolveMode persistence.ConflictResolveWorkflowMode,
resetMutableState MutableState,
newContext Context,
newMutableState MutableState,
currentContext Context,
currentMutableState MutableState,
currentTransactionPolicy *TransactionPolicy,
) error
UpdateWorkflowExecutionAsActive(
ctx context.Context,
now time.Time,
) error
UpdateWorkflowExecutionWithNewAsActive(
ctx context.Context,
now time.Time,
newContext Context,
newMutableState MutableState,
) error
UpdateWorkflowExecutionAsPassive(
ctx context.Context,
now time.Time,
) error
UpdateWorkflowExecutionWithNewAsPassive(
ctx context.Context,
now time.Time,
newContext Context,
newMutableState MutableState,
) error
UpdateWorkflowExecutionWithNew(
ctx context.Context,
now time.Time,
updateMode persistence.UpdateWorkflowMode,
newContext Context,
newMutableState MutableState,
currentWorkflowTransactionPolicy TransactionPolicy,
newWorkflowTransactionPolicy *TransactionPolicy,
) error
UpdateWorkflowExecutionTasks(
ctx context.Context,
now time.Time,
) error
}
)
type (
contextImpl struct {
domainID string
workflowExecution types.WorkflowExecution
shard shard.Context
executionManager persistence.ExecutionManager
logger log.Logger
metricsClient metrics.Client
mutex locks.Mutex
mutableState MutableState
stats *persistence.ExecutionStats
updateCondition int64
appendHistoryNodesFn func(context.Context, string, types.WorkflowExecution, *persistence.AppendHistoryNodesRequest) (*persistence.AppendHistoryNodesResponse, error)
createWorkflowExecutionFn func(context.Context, *persistence.CreateWorkflowExecutionRequest) (*persistence.CreateWorkflowExecutionResponse, error)
notifyTasksFromWorkflowSnapshotFn func(*persistence.WorkflowSnapshot, events.PersistedBlobs, bool)
emitSessionUpdateStatsFn func(string, *persistence.MutableStateUpdateSessionStats)
}
)
var _ Context = (*contextImpl)(nil)
// NewContext creates a new workflow execution context
func NewContext(
domainID string,
execution types.WorkflowExecution,
shard shard.Context,
executionManager persistence.ExecutionManager,
logger log.Logger,
) Context {
logger = logger.WithTags(tag.WorkflowDomainID(domainID), tag.WorkflowID(execution.GetWorkflowID()), tag.WorkflowRunID(execution.GetRunID()))
return &contextImpl{
domainID: domainID,
workflowExecution: execution,
shard: shard,
executionManager: executionManager,
logger: logger,
metricsClient: shard.GetMetricsClient(),
mutex: locks.NewMutex(),
stats: &persistence.ExecutionStats{
HistorySize: 0,
},
appendHistoryNodesFn: func(ctx context.Context, domainID string, workflowExecution types.WorkflowExecution, request *persistence.AppendHistoryNodesRequest) (*persistence.AppendHistoryNodesResponse, error) {
return appendHistoryV2EventsWithRetry(ctx, shard, common.CreatePersistenceRetryPolicy(), domainID, workflowExecution, request)
},
createWorkflowExecutionFn: func(ctx context.Context, request *persistence.CreateWorkflowExecutionRequest) (*persistence.CreateWorkflowExecutionResponse, error) {
return createWorkflowExecutionWithRetry(ctx, shard, logger, common.CreatePersistenceRetryPolicy(), request)
},
notifyTasksFromWorkflowSnapshotFn: func(snapshot *persistence.WorkflowSnapshot, blobs events.PersistedBlobs, persistentError bool) {
notifyTasksFromWorkflowSnapshot(shard.GetEngine(), snapshot, blobs, persistentError)
},
emitSessionUpdateStatsFn: func(domainName string, stats *persistence.MutableStateUpdateSessionStats) {
emitSessionUpdateStats(shard.GetMetricsClient(), domainName, stats)
},
}
}
func (c *contextImpl) Lock(ctx context.Context) error {
return c.mutex.Lock(ctx)
}
func (c *contextImpl) Unlock() {
c.mutex.Unlock()
}
func (c *contextImpl) Clear() {
c.metricsClient.IncCounter(metrics.WorkflowContextScope, metrics.WorkflowContextCleared)
c.mutableState = nil
c.stats = &persistence.ExecutionStats{
HistorySize: 0,
}
}
func (c *contextImpl) GetDomainID() string {
return c.domainID
}
func (c *contextImpl) GetExecution() *types.WorkflowExecution {
return &c.workflowExecution
}
func (c *contextImpl) GetDomainName() string {
domainName, err := c.shard.GetDomainCache().GetDomainName(c.domainID)
if err != nil {
return ""
}
return domainName
}
func (c *contextImpl) GetHistorySize() int64 {
return c.stats.HistorySize
}
func (c *contextImpl) SetHistorySize(size int64) {
c.stats.HistorySize = size
if c.mutableState != nil {
c.mutableState.SetHistorySize(size)
}
}
func (c *contextImpl) LoadExecutionStats(
ctx context.Context,
) (*persistence.ExecutionStats, error) {
_, err := c.LoadWorkflowExecution(ctx)
if err != nil {
return nil, err
}
return c.stats, nil
}
func isChecksumError(err error) bool {
if err == nil {
return false
}
return strings.Contains(err.Error(), "checksum mismatch error")
}
func (c *contextImpl) LoadWorkflowExecutionWithTaskVersion(
ctx context.Context,
incomingVersion int64,
) (MutableState, error) {
domainEntry, err := c.shard.GetDomainCache().GetDomainByID(c.domainID)
if err != nil {
return nil, err
}
if c.mutableState == nil {
var response *persistence.GetWorkflowExecutionResponse
for i := 0; i < checksumErrorRetryCount; i++ {
response, err = c.getWorkflowExecutionWithRetry(ctx, &persistence.GetWorkflowExecutionRequest{
DomainID: c.domainID,
Execution: c.workflowExecution,
DomainName: domainEntry.GetInfo().Name,
})
if err != nil {
return nil, err
}
c.mutableState = NewMutableStateBuilder(
c.shard,
c.logger,
domainEntry,
)
err = c.mutableState.Load(response.State)
if err == nil {
break
} else if !isChecksumError(err) {
c.logger.Error("failed to load mutable state", tag.Error(err))
break
}
// backoff before retry
c.shard.GetTimeSource().Sleep(time.Millisecond * 100)
}
if isChecksumError(err) {
c.metricsClient.IncCounter(metrics.WorkflowContextScope, metrics.StaleMutableStateCounter)
c.logger.Error("encounter stale mutable state after retry", tag.Error(err))
}
c.stats = response.State.ExecutionStats
c.updateCondition = response.State.ExecutionInfo.NextEventID
// finally emit execution and session stats
emitWorkflowExecutionStats(
c.metricsClient,
c.GetDomainName(),
response.MutableStateStats,
c.stats.HistorySize,
)
}
flushBeforeReady, err := c.mutableState.StartTransaction(domainEntry, incomingVersion)
if err != nil {
return nil, err
}
if !flushBeforeReady {
return c.mutableState, nil
}
if err = c.UpdateWorkflowExecutionAsActive(
ctx,
c.shard.GetTimeSource().Now(),
); err != nil {
return nil, err
}
flushBeforeReady, err = c.mutableState.StartTransaction(domainEntry, incomingVersion)
if err != nil {
return nil, err
}
if flushBeforeReady {
return nil, &types.InternalServiceError{
Message: "workflowExecutionContext counter flushBeforeReady status after loading mutable state from DB",
}
}
return c.mutableState, nil
}
// GetWorkflowExecution should only be used in tests
func (c *contextImpl) GetWorkflowExecution() MutableState {
return c.mutableState
}
// SetWorkflowExecution should only be used in tests
func (c *contextImpl) SetWorkflowExecution(mutableState MutableState) {
c.mutableState = mutableState
}
func (c *contextImpl) LoadWorkflowExecution(
ctx context.Context,
) (MutableState, error) {
// Use empty version to skip incoming task version validation
return c.LoadWorkflowExecutionWithTaskVersion(ctx, common.EmptyVersion)
}
func (c *contextImpl) CreateWorkflowExecution(
ctx context.Context,
newWorkflow *persistence.WorkflowSnapshot,
persistedHistory events.PersistedBlob,
createMode persistence.CreateWorkflowMode,
prevRunID string,
prevLastWriteVersion int64,
) (retError error) {
defer func() {
if retError != nil {
c.Clear()
}
}()
domainName := c.GetDomainName()
createRequest := &persistence.CreateWorkflowExecutionRequest{
// workflow create mode & prev run ID & version
Mode: createMode,
PreviousRunID: prevRunID,
PreviousLastWriteVersion: prevLastWriteVersion,
NewWorkflowSnapshot: *newWorkflow,
DomainName: domainName,
}
historySize := int64(len(persistedHistory.Data))
historySize += c.GetHistorySize()
c.SetHistorySize(historySize)
createRequest.NewWorkflowSnapshot.ExecutionStats = &persistence.ExecutionStats{
HistorySize: historySize,
}
resp, err := c.createWorkflowExecutionFn(ctx, createRequest)
if err != nil {
if isOperationPossiblySuccessfulError(err) {
c.notifyTasksFromWorkflowSnapshotFn(newWorkflow, events.PersistedBlobs{persistedHistory}, true)
}
return err
}
c.notifyTasksFromWorkflowSnapshotFn(newWorkflow, events.PersistedBlobs{persistedHistory}, false)
// finally emit session stats
c.emitSessionUpdateStatsFn(domainName, resp.MutableStateUpdateSessionStats)
return nil
}
func (c *contextImpl) ConflictResolveWorkflowExecution(
ctx context.Context,
now time.Time,
conflictResolveMode persistence.ConflictResolveWorkflowMode,
resetMutableState MutableState,
newContext Context,
newMutableState MutableState,
currentContext Context,
currentMutableState MutableState,
currentTransactionPolicy *TransactionPolicy,
) (retError error) {
defer func() {
if retError != nil {
c.Clear()
}
}()
resetWorkflow, resetWorkflowEventsSeq, err := resetMutableState.CloseTransactionAsSnapshot(
now,
TransactionPolicyPassive,
)
if err != nil {
return err
}
var persistedBlobs events.PersistedBlobs
resetHistorySize := c.GetHistorySize()
for _, workflowEvents := range resetWorkflowEventsSeq {
blob, err := c.PersistNonStartWorkflowBatchEvents(ctx, workflowEvents)
if err != nil {
return err
}
resetHistorySize += int64(len(blob.Data))
persistedBlobs = append(persistedBlobs, blob)
}
c.SetHistorySize(resetHistorySize)
resetWorkflow.ExecutionStats = &persistence.ExecutionStats{
HistorySize: resetHistorySize,
}
var newWorkflow *persistence.WorkflowSnapshot
var newWorkflowEventsSeq []*persistence.WorkflowEvents
if newContext != nil && newMutableState != nil {
defer func() {
if retError != nil {
newContext.Clear()
}
}()
newWorkflow, newWorkflowEventsSeq, err = newMutableState.CloseTransactionAsSnapshot(
now,
TransactionPolicyPassive,
)
if err != nil {
return err
}
newWorkflowSizeSize := newContext.GetHistorySize()
startEvents := newWorkflowEventsSeq[0]
blob, err := c.PersistStartWorkflowBatchEvents(ctx, startEvents)
if err != nil {
return err
}
newWorkflowSizeSize += int64(len(blob.Data))
newContext.SetHistorySize(newWorkflowSizeSize)
newWorkflow.ExecutionStats = &persistence.ExecutionStats{
HistorySize: newWorkflowSizeSize,
}
persistedBlobs = append(persistedBlobs, blob)
}
var currentWorkflow *persistence.WorkflowMutation
var currentWorkflowEventsSeq []*persistence.WorkflowEvents
if currentContext != nil && currentMutableState != nil && currentTransactionPolicy != nil {
defer func() {
if retError != nil {
currentContext.Clear()
}
}()
currentWorkflow, currentWorkflowEventsSeq, err = currentMutableState.CloseTransactionAsMutation(
now,
*currentTransactionPolicy,
)
if err != nil {
return err
}
currentWorkflowSize := currentContext.GetHistorySize()
for _, workflowEvents := range currentWorkflowEventsSeq {
blob, err := c.PersistNonStartWorkflowBatchEvents(ctx, workflowEvents)
if err != nil {
return err
}
currentWorkflowSize += int64(len(blob.Data))
persistedBlobs = append(persistedBlobs, blob)
}
currentContext.SetHistorySize(currentWorkflowSize)
currentWorkflow.ExecutionStats = &persistence.ExecutionStats{
HistorySize: currentWorkflowSize,
}
}
if err := c.conflictResolveEventReapply(
conflictResolveMode,
resetWorkflowEventsSeq,
newWorkflowEventsSeq,
// current workflow events will not participate in the events reapplication
); err != nil {
return err
}
domain, errorDomainName := c.shard.GetDomainCache().GetDomainName(c.domainID)
if errorDomainName != nil {
return errorDomainName
}
resp, err := c.shard.ConflictResolveWorkflowExecution(ctx, &persistence.ConflictResolveWorkflowExecutionRequest{
// RangeID , this is set by shard context
Mode: conflictResolveMode,
ResetWorkflowSnapshot: *resetWorkflow,
NewWorkflowSnapshot: newWorkflow,
CurrentWorkflowMutation: currentWorkflow,
// Encoding, this is set by shard context
DomainName: domain,
})
if err != nil {
if isOperationPossiblySuccessfulError(err) {
notifyTasksFromWorkflowSnapshot(c.shard.GetEngine(), resetWorkflow, persistedBlobs, true)
notifyTasksFromWorkflowSnapshot(c.shard.GetEngine(), newWorkflow, persistedBlobs, true)
notifyTasksFromWorkflowMutation(c.shard.GetEngine(), currentWorkflow, persistedBlobs, true)
}
return err
}
currentBranchToken, err := resetMutableState.GetCurrentBranchToken()
if err != nil {
return err
}
workflowState, workflowCloseState := resetMutableState.GetWorkflowStateCloseStatus()
// Current branch changed and notify the watchers
c.shard.GetEngine().NotifyNewHistoryEvent(events.NewNotification(
c.domainID,
&c.workflowExecution,
resetMutableState.GetLastFirstEventID(),
resetMutableState.GetNextEventID(),
resetMutableState.GetPreviousStartedEventID(),
currentBranchToken,
workflowState,
workflowCloseState,
))
notifyTasksFromWorkflowSnapshot(c.shard.GetEngine(), resetWorkflow, persistedBlobs, false)
notifyTasksFromWorkflowSnapshot(c.shard.GetEngine(), newWorkflow, persistedBlobs, false)
notifyTasksFromWorkflowMutation(c.shard.GetEngine(), currentWorkflow, persistedBlobs, false)
// finally emit session stats
domainName := c.GetDomainName()
emitWorkflowHistoryStats(
c.metricsClient,
domainName,
int(c.stats.HistorySize),
int(resetMutableState.GetNextEventID()-1),
)
emitSessionUpdateStats(
c.metricsClient,
domainName,
resp.MutableStateUpdateSessionStats,
)
// emit workflow completion stats if any
if resetWorkflow.ExecutionInfo.State == persistence.WorkflowStateCompleted {
if event, err := resetMutableState.GetCompletionEvent(ctx); err == nil {
workflowType := resetWorkflow.ExecutionInfo.WorkflowTypeName
taskList := resetWorkflow.ExecutionInfo.TaskList
emitWorkflowCompletionStats(c.metricsClient, c.logger,
domainName, workflowType, c.workflowExecution.GetWorkflowID(), c.workflowExecution.GetRunID(),
taskList, event)
}
}
return nil
}
func (c *contextImpl) UpdateWorkflowExecutionAsActive(
ctx context.Context,
now time.Time,
) error {
return c.UpdateWorkflowExecutionWithNew(
ctx,
now,
persistence.UpdateWorkflowModeUpdateCurrent,
nil,
nil,
TransactionPolicyActive,
nil,
)
}
func (c *contextImpl) UpdateWorkflowExecutionWithNewAsActive(
ctx context.Context,
now time.Time,
newContext Context,
newMutableState MutableState,
) error {
return c.UpdateWorkflowExecutionWithNew(
ctx,
now,
persistence.UpdateWorkflowModeUpdateCurrent,
newContext,
newMutableState,
TransactionPolicyActive,
TransactionPolicyActive.Ptr(),
)
}
func (c *contextImpl) UpdateWorkflowExecutionAsPassive(
ctx context.Context,
now time.Time,
) error {
return c.UpdateWorkflowExecutionWithNew(
ctx,
now,
persistence.UpdateWorkflowModeUpdateCurrent,
nil,
nil,
TransactionPolicyPassive,
nil,
)
}
func (c *contextImpl) UpdateWorkflowExecutionWithNewAsPassive(
ctx context.Context,
now time.Time,
newContext Context,
newMutableState MutableState,
) error {
return c.UpdateWorkflowExecutionWithNew(
ctx,
now,
persistence.UpdateWorkflowModeUpdateCurrent,
newContext,
newMutableState,
TransactionPolicyPassive,
TransactionPolicyPassive.Ptr(),
)
}
func (c *contextImpl) UpdateWorkflowExecutionTasks(
ctx context.Context,
now time.Time,
) (retError error) {
defer func() {
if retError != nil {
c.Clear()
}
}()
currentWorkflow, currentWorkflowEventsSeq, err := c.mutableState.CloseTransactionAsMutation(
now,
TransactionPolicyPassive,
)
if err != nil {
return err
}
if len(currentWorkflowEventsSeq) != 0 {
return types.InternalServiceError{
Message: "UpdateWorkflowExecutionTask can only be used for persisting new workflow tasks, but found new history events",
}
}
currentWorkflow.ExecutionStats = &persistence.ExecutionStats{
HistorySize: c.GetHistorySize(),
}
domainName, errorDomainName := c.shard.GetDomainCache().GetDomainName(c.domainID)
if errorDomainName != nil {
return errorDomainName
}
resp, err := c.updateWorkflowExecutionWithRetry(ctx, &persistence.UpdateWorkflowExecutionRequest{
// RangeID , this is set by shard context
Mode: persistence.UpdateWorkflowModeIgnoreCurrent,
UpdateWorkflowMutation: *currentWorkflow,
// Encoding, this is set by shard context
DomainName: domainName,
})
if err != nil {
if isOperationPossiblySuccessfulError(err) {
notifyTasksFromWorkflowMutation(c.shard.GetEngine(), currentWorkflow, nil, true)
}
return err
}
// TODO remove updateCondition in favor of condition in mutable state
c.updateCondition = currentWorkflow.ExecutionInfo.NextEventID
// notify current workflow tasks
notifyTasksFromWorkflowMutation(c.shard.GetEngine(), currentWorkflow, nil, false)
emitSessionUpdateStats(
c.metricsClient,
c.GetDomainName(),
resp.MutableStateUpdateSessionStats,
)
return nil
}
func (c *contextImpl) UpdateWorkflowExecutionWithNew(
ctx context.Context,
now time.Time,
updateMode persistence.UpdateWorkflowMode,
newContext Context,
newMutableState MutableState,
currentWorkflowTransactionPolicy TransactionPolicy,
newWorkflowTransactionPolicy *TransactionPolicy,
) (retError error) {
defer func() {
if retError != nil {
c.Clear()
}
}()
currentWorkflow, currentWorkflowEventsSeq, err := c.mutableState.CloseTransactionAsMutation(
now,
currentWorkflowTransactionPolicy,
)
if err != nil {
return err
}
var persistedBlobs events.PersistedBlobs
currentWorkflowSize := c.GetHistorySize()
oldWorkflowSize := currentWorkflowSize
currentWorkflowHistoryCount := c.mutableState.GetNextEventID() - 1
oldWorkflowHistoryCount := currentWorkflowHistoryCount
for _, workflowEvents := range currentWorkflowEventsSeq {
blob, err := c.PersistNonStartWorkflowBatchEvents(ctx, workflowEvents)
currentWorkflowHistoryCount += int64(len(workflowEvents.Events))
if err != nil {
return err
}
currentWorkflowSize += int64(len(blob.Data))
persistedBlobs = append(persistedBlobs, blob)
}
c.SetHistorySize(currentWorkflowSize)
currentWorkflow.ExecutionStats = &persistence.ExecutionStats{
HistorySize: currentWorkflowSize,
}
var newWorkflow *persistence.WorkflowSnapshot
var newWorkflowEventsSeq []*persistence.WorkflowEvents
if newContext != nil && newMutableState != nil && newWorkflowTransactionPolicy != nil {
defer func() {
if retError != nil {
newContext.Clear()
}
}()
newWorkflow, newWorkflowEventsSeq, err = newMutableState.CloseTransactionAsSnapshot(
now,
*newWorkflowTransactionPolicy,
)
if err != nil {
return err
}
newWorkflowSizeSize := newContext.GetHistorySize()
startEvents := newWorkflowEventsSeq[0]
firstEventID := startEvents.Events[0].ID
var blob events.PersistedBlob
if firstEventID == common.FirstEventID {
blob, err = c.PersistStartWorkflowBatchEvents(ctx, startEvents)
if err != nil {
return err
}
} else {
// NOTE: This is the case for reset workflow, reset workflow already inserted a branch record
blob, err = c.PersistNonStartWorkflowBatchEvents(ctx, startEvents)
if err != nil {
return err
}
}
persistedBlobs = append(persistedBlobs, blob)
newWorkflowSizeSize += int64(len(blob.Data))
newContext.SetHistorySize(newWorkflowSizeSize)
newWorkflow.ExecutionStats = &persistence.ExecutionStats{
HistorySize: newWorkflowSizeSize,
}
}
if err := mergeContinueAsNewReplicationTasks(
updateMode,
currentWorkflow,
newWorkflow,
); err != nil {
return err
}
if err := c.updateWorkflowExecutionEventReapply(
updateMode,
currentWorkflowEventsSeq,
newWorkflowEventsSeq,
); err != nil {
return err
}
domain, errorDomainName := c.shard.GetDomainCache().GetDomainName(c.domainID)
if errorDomainName != nil {
return errorDomainName
}
resp, err := c.updateWorkflowExecutionWithRetry(ctx, &persistence.UpdateWorkflowExecutionRequest{
// RangeID , this is set by shard context
Mode: updateMode,
UpdateWorkflowMutation: *currentWorkflow,
NewWorkflowSnapshot: newWorkflow,
// Encoding, this is set by shard context
DomainName: domain,
})
if err != nil {
if isOperationPossiblySuccessfulError(err) {
notifyTasksFromWorkflowMutation(c.shard.GetEngine(), currentWorkflow, persistedBlobs, true)
notifyTasksFromWorkflowSnapshot(c.shard.GetEngine(), newWorkflow, persistedBlobs, true)
}
return err
}
// TODO remove updateCondition in favor of condition in mutable state
c.updateCondition = currentWorkflow.ExecutionInfo.NextEventID
// for any change in the workflow, send a event
currentBranchToken, err := c.mutableState.GetCurrentBranchToken()
if err != nil {
return err
}
workflowState, workflowCloseState := c.mutableState.GetWorkflowStateCloseStatus()
c.shard.GetEngine().NotifyNewHistoryEvent(events.NewNotification(
c.domainID,
&c.workflowExecution,
c.mutableState.GetLastFirstEventID(),
c.mutableState.GetNextEventID(),
c.mutableState.GetPreviousStartedEventID(),
currentBranchToken,
workflowState,
workflowCloseState,
))
// notify current workflow tasks
notifyTasksFromWorkflowMutation(c.shard.GetEngine(), currentWorkflow, persistedBlobs, false)
// notify new workflow tasks
notifyTasksFromWorkflowSnapshot(c.shard.GetEngine(), newWorkflow, persistedBlobs, false)
// finally emit session stats
domainName := c.GetDomainName()
emitWorkflowHistoryStats(
c.metricsClient,
domainName,
int(c.stats.HistorySize),
int(c.mutableState.GetNextEventID()-1),
)
emitSessionUpdateStats(
c.metricsClient,
domainName,
resp.MutableStateUpdateSessionStats,
)
c.emitLargeWorkflowShardIDStats(currentWorkflowSize-oldWorkflowSize, oldWorkflowHistoryCount, oldWorkflowSize, currentWorkflowHistoryCount)
// emit workflow completion stats if any
if currentWorkflow.ExecutionInfo.State == persistence.WorkflowStateCompleted {
if event, err := c.mutableState.GetCompletionEvent(ctx); err == nil {
workflowType := currentWorkflow.ExecutionInfo.WorkflowTypeName
taskList := currentWorkflow.ExecutionInfo.TaskList
emitWorkflowCompletionStats(c.metricsClient, c.logger,
domainName, workflowType, c.workflowExecution.GetWorkflowID(), c.workflowExecution.GetRunID(),
taskList, event)
}
}
return nil
}
func notifyTasksFromWorkflowSnapshot(
engine engine.Engine,
workflowSnapShot *persistence.WorkflowSnapshot,
history events.PersistedBlobs,
persistenceError bool,
) {
if workflowSnapShot == nil {
return
}
notifyTasks(
engine,
workflowSnapShot.ExecutionInfo,
workflowSnapShot.VersionHistories,
workflowSnapShot.ActivityInfos,
workflowSnapShot.TransferTasks,
workflowSnapShot.TimerTasks,
workflowSnapShot.CrossClusterTasks,
workflowSnapShot.ReplicationTasks,
history,
persistenceError,
)
}
func notifyTasksFromWorkflowMutation(
engine engine.Engine,
workflowMutation *persistence.WorkflowMutation,
history events.PersistedBlobs,
persistenceError bool,
) {
if workflowMutation == nil {
return
}
notifyTasks(
engine,
workflowMutation.ExecutionInfo,
workflowMutation.VersionHistories,
workflowMutation.UpsertActivityInfos,
workflowMutation.TransferTasks,
workflowMutation.TimerTasks,
workflowMutation.CrossClusterTasks,
workflowMutation.ReplicationTasks,
history,
persistenceError,
)
}
func activityInfosToMap(ais []*persistence.ActivityInfo) map[int64]*persistence.ActivityInfo {
m := make(map[int64]*persistence.ActivityInfo, len(ais))
for _, ai := range ais {
m[ai.ScheduleID] = ai
}
return m
}
func notifyTasks(
engine engine.Engine,
executionInfo *persistence.WorkflowExecutionInfo,
versionHistories *persistence.VersionHistories,
activities []*persistence.ActivityInfo,
transferTasks []persistence.Task,
timerTasks []persistence.Task,
crossClusterTasks []persistence.Task,
replicationTasks []persistence.Task,
history events.PersistedBlobs,
persistenceError bool,
) {
transferTaskInfo := &hcommon.NotifyTaskInfo{
ExecutionInfo: executionInfo,
Tasks: transferTasks,
PersistenceError: persistenceError,
}
timerTaskInfo := &hcommon.NotifyTaskInfo{
ExecutionInfo: executionInfo,
Tasks: timerTasks,
PersistenceError: persistenceError,
}
crossClusterTaskInfo := &hcommon.NotifyTaskInfo{
ExecutionInfo: executionInfo,
Tasks: crossClusterTasks,
PersistenceError: persistenceError,
}
replicationTaskInfo := &hcommon.NotifyTaskInfo{
ExecutionInfo: executionInfo,
Tasks: replicationTasks,
VersionHistories: versionHistories,
Activities: activityInfosToMap(activities),
History: history,
PersistenceError: persistenceError,
}
engine.NotifyNewTransferTasks(transferTaskInfo)
engine.NotifyNewTimerTasks(timerTaskInfo)
engine.NotifyNewCrossClusterTasks(crossClusterTaskInfo)
engine.NotifyNewReplicationTasks(replicationTaskInfo)
}
func mergeContinueAsNewReplicationTasks(
updateMode persistence.UpdateWorkflowMode,
currentWorkflowMutation *persistence.WorkflowMutation,
newWorkflowSnapshot *persistence.WorkflowSnapshot,
) error {
if currentWorkflowMutation.ExecutionInfo.CloseStatus != persistence.WorkflowCloseStatusContinuedAsNew {
return nil
} else if updateMode == persistence.UpdateWorkflowModeBypassCurrent && newWorkflowSnapshot == nil {
// update current workflow as zombie & continue as new without new zombie workflow
// this case can be valid if new workflow is already created by resend
return nil
}
// current workflow is doing continue as new
// it is possible that continue as new is done as part of passive logic
if len(currentWorkflowMutation.ReplicationTasks) == 0 {
return nil
}
if newWorkflowSnapshot == nil || len(newWorkflowSnapshot.ReplicationTasks) != 1 {
return &types.InternalServiceError{
Message: "unable to find replication task from new workflow for continue as new replication",
}
}
// merge the new run first event batch replication task
// to current event batch replication task
newRunTask := newWorkflowSnapshot.ReplicationTasks[0].(*persistence.HistoryReplicationTask)
newWorkflowSnapshot.ReplicationTasks = nil
newRunBranchToken := newRunTask.BranchToken
taskUpdated := false
for _, replicationTask := range currentWorkflowMutation.ReplicationTasks {
if task, ok := replicationTask.(*persistence.HistoryReplicationTask); ok {
taskUpdated = true
task.NewRunBranchToken = newRunBranchToken
}
}
if !taskUpdated {
return &types.InternalServiceError{
Message: "unable to find replication task from current workflow for continue as new replication",
}
}
return nil
}
func (c *contextImpl) PersistStartWorkflowBatchEvents(
ctx context.Context,
workflowEvents *persistence.WorkflowEvents,
) (events.PersistedBlob, error) {
if len(workflowEvents.Events) == 0 {
return events.PersistedBlob{}, &types.InternalServiceError{
Message: "cannot persist first workflow events with empty events",
}
}
domainID := workflowEvents.DomainID
domainName, err := c.shard.GetDomainCache().GetDomainName(domainID)
if err != nil {
return events.PersistedBlob{}, err
}
workflowID := workflowEvents.WorkflowID
runID := workflowEvents.RunID
execution := types.WorkflowExecution{
WorkflowID: workflowEvents.WorkflowID,
RunID: workflowEvents.RunID,
}
resp, err := c.appendHistoryNodesFn(
ctx,
domainID,
execution,
&persistence.AppendHistoryNodesRequest{
IsNewBranch: true,
Info: persistence.BuildHistoryGarbageCleanupInfo(domainID, workflowID, runID),
BranchToken: workflowEvents.BranchToken,
Events: workflowEvents.Events,
DomainName: domainName,
// TransactionID is set by shard context
},
)
if err != nil {
return events.PersistedBlob{}, err
}
return events.PersistedBlob{
DataBlob: resp.DataBlob,
BranchToken: workflowEvents.BranchToken,
FirstEventID: workflowEvents.Events[0].ID,
}, nil
}
func (c *contextImpl) PersistNonStartWorkflowBatchEvents(
ctx context.Context,
workflowEvents *persistence.WorkflowEvents,
) (events.PersistedBlob, error) {
if len(workflowEvents.Events) == 0 {
return events.PersistedBlob{}, nil // allow update workflow without events
}
domainID := workflowEvents.DomainID
domainName, err := c.shard.GetDomainCache().GetDomainName(domainID)
if err != nil {
return events.PersistedBlob{}, err
}
execution := types.WorkflowExecution{
WorkflowID: workflowEvents.WorkflowID,
RunID: workflowEvents.RunID,
}
resp, err := c.appendHistoryNodesFn(
ctx,
domainID,
execution,
&persistence.AppendHistoryNodesRequest{
IsNewBranch: false,
BranchToken: workflowEvents.BranchToken,
Events: workflowEvents.Events,
DomainName: domainName,
// TransactionID is set by shard context
},
)
if err != nil {
return events.PersistedBlob{}, err
}
return events.PersistedBlob{
DataBlob: resp.DataBlob,
BranchToken: workflowEvents.BranchToken,
FirstEventID: workflowEvents.Events[0].ID,
}, nil
}
func appendHistoryV2EventsWithRetry(
ctx context.Context,
shardContext shard.Context,
retryPolicy backoff.RetryPolicy,
domainID string,
execution types.WorkflowExecution,
request *persistence.AppendHistoryNodesRequest,
) (*persistence.AppendHistoryNodesResponse, error) {
var resp *persistence.AppendHistoryNodesResponse
op := func() error {
var err error
resp, err = shardContext.AppendHistoryV2Events(ctx, request, domainID, execution)
return err
}
throttleRetry := backoff.NewThrottleRetry(
backoff.WithRetryPolicy(retryPolicy),
backoff.WithRetryableError(persistence.IsTransientError),
)
err := throttleRetry.Do(ctx, op)
return resp, err
}
func createWorkflowExecutionWithRetry(
ctx context.Context,
shardContext shard.Context,
logger log.Logger,
retryPolicy backoff.RetryPolicy,
request *persistence.CreateWorkflowExecutionRequest,
) (*persistence.CreateWorkflowExecutionResponse, error) {
var resp *persistence.CreateWorkflowExecutionResponse
op := func() error {
var err error
resp, err = shardContext.CreateWorkflowExecution(ctx, request)
fmt.Println(err)
return err
}
isRetryable := func(err error) bool {
if _, ok := err.(*persistence.TimeoutError); ok {
// TODO: is timeout error retryable for create workflow?
// if we treat it as retryable, user may receive workflowAlreadyRunning error
// on the first start workflow execution request.
return false
}
return persistence.IsTransientError(err)
}
throttleRetry := backoff.NewThrottleRetry(
backoff.WithRetryPolicy(retryPolicy),
backoff.WithRetryableError(isRetryable),
)
err := throttleRetry.Do(ctx, op)
switch err.(type) {
case nil:
return resp, nil
case *persistence.WorkflowExecutionAlreadyStartedError:
// it is possible that workflow already exists and caller need to apply
// workflow ID reuse policy
return nil, err
default:
logger.Error(
"Persistent store operation failure",
tag.StoreOperationCreateWorkflowExecution,
tag.Error(err),
)
return nil, err
}
}
func (c *contextImpl) getWorkflowExecutionWithRetry(
ctx context.Context,
request *persistence.GetWorkflowExecutionRequest,
) (*persistence.GetWorkflowExecutionResponse, error) {
var resp *persistence.GetWorkflowExecutionResponse
op := func() error {
var err error
resp, err = c.shard.GetWorkflowExecution(ctx, request)
return err
}
throttleRetry := backoff.NewThrottleRetry(
backoff.WithRetryPolicy(common.CreatePersistenceRetryPolicy()),
backoff.WithRetryableError(persistence.IsTransientError),
)
err := throttleRetry.Do(ctx, op)
switch err.(type) {
case nil:
return resp, nil
case *types.EntityNotExistsError:
// it is possible that workflow does not exists
return nil, err
default:
c.logger.Error(
"Persistent fetch operation failure",
tag.StoreOperationGetWorkflowExecution,
tag.Error(err),
)
return nil, err
}
}
func (c *contextImpl) updateWorkflowExecutionWithRetry(
ctx context.Context,
request *persistence.UpdateWorkflowExecutionRequest,
) (*persistence.UpdateWorkflowExecutionResponse, error) {
var resp *persistence.UpdateWorkflowExecutionResponse
op := func() error {
var err error
resp, err = c.shard.UpdateWorkflowExecution(ctx, request)
return err
}
// Preparation for the task Validation.
// metricsClient := c.shard.GetMetricsClient()
// domainCache := c.shard.GetDomainCache()
// executionManager := c.shard.GetExecutionManager()
// historymanager := c.shard.GetHistoryManager()
// zapLogger, _ := zap.NewProduction()
// checker, _ := taskvalidator.NewWfChecker(zapLogger, metricsClient, domainCache, executionManager, historymanager)
isRetryable := func(err error) bool {
if _, ok := err.(*persistence.TimeoutError); ok {
// timeout error is not retryable for update workflow execution
return false
}
return persistence.IsTransientError(err)
}
throttleRetry := backoff.NewThrottleRetry(
backoff.WithRetryPolicy(common.CreatePersistenceRetryPolicy()),
backoff.WithRetryableError(isRetryable),
)
err := throttleRetry.Do(ctx, op)
switch err.(type) {
case nil:
return resp, nil
case *persistence.ConditionFailedError:
return nil, &conflictError{err}
default:
c.logger.Error(
"Persistent store operation failure",
tag.StoreOperationUpdateWorkflowExecution,
tag.Error(err),
tag.Number(c.updateCondition),
)
// TODO: Call the Task Validation here so that it happens whenever an error happen during Update.
// err1 := checker.WorkflowCheckforValidation(
// ctx,
// c.workflowExecution.GetWorkflowID(),
// c.domainID,
// c.GetDomainName(),
// c.workflowExecution.GetRunID(),
// )
// if err1 != nil {
// return nil, err1
// }
return nil, err
}
}
func (c *contextImpl) updateWorkflowExecutionEventReapply(
updateMode persistence.UpdateWorkflowMode,
eventBatch1 []*persistence.WorkflowEvents,
eventBatch2 []*persistence.WorkflowEvents,
) error {
if updateMode != persistence.UpdateWorkflowModeBypassCurrent {
return nil
}
var eventBatches []*persistence.WorkflowEvents
eventBatches = append(eventBatches, eventBatch1...)
eventBatches = append(eventBatches, eventBatch2...)
return c.ReapplyEvents(eventBatches)
}
func (c *contextImpl) conflictResolveEventReapply(
conflictResolveMode persistence.ConflictResolveWorkflowMode,
eventBatch1 []*persistence.WorkflowEvents,
eventBatch2 []*persistence.WorkflowEvents,
) error {
if conflictResolveMode != persistence.ConflictResolveWorkflowModeBypassCurrent {
return nil
}
var eventBatches []*persistence.WorkflowEvents
eventBatches = append(eventBatches, eventBatch1...)
eventBatches = append(eventBatches, eventBatch2...)
return c.ReapplyEvents(eventBatches)
}
func (c *contextImpl) ReapplyEvents(
eventBatches []*persistence.WorkflowEvents,
) error {
// NOTE: this function should only be used to workflow which is
// not the caller, or otherwise deadlock will appear
if len(eventBatches) == 0 {
return nil
}
domainID := eventBatches[0].DomainID
workflowID := eventBatches[0].WorkflowID
runID := eventBatches[0].RunID
domainCache := c.shard.GetDomainCache()
clientBean := c.shard.GetService().GetClientBean()
serializer := c.shard.GetService().GetPayloadSerializer()
domainEntry, err := domainCache.GetDomainByID(domainID)
if err != nil {
return err
}
if domainEntry.IsDomainPendingActive() {
return nil
}
var reapplyEvents []*types.HistoryEvent
for _, events := range eventBatches {
if events.DomainID != domainID ||
events.WorkflowID != workflowID {
return &types.InternalServiceError{
Message: "workflowExecutionContext encounter mismatch domainID / workflowID in events reapplication.",
}
}
for _, event := range events.Events {
switch event.GetEventType() {
case types.EventTypeWorkflowExecutionSignaled:
reapplyEvents = append(reapplyEvents, event)
}
}
}
if len(reapplyEvents) == 0 {
return nil
}
// Reapply events only reapply to the current run.
// The run id is only used for reapply event de-duplication
execution := &types.WorkflowExecution{
WorkflowID: workflowID,
RunID: runID,
}
ctx, cancel := context.WithTimeout(context.Background(), defaultRemoteCallTimeout)
defer cancel()
activeCluster := domainEntry.GetReplicationConfig().ActiveClusterName
if activeCluster == c.shard.GetClusterMetadata().GetCurrentClusterName() {
return c.shard.GetEngine().ReapplyEvents(
ctx,
domainID,
workflowID,
runID,
reapplyEvents,
)
}
// The active cluster of the domain is the same as current cluster.
// Use the history from the same cluster to reapply events
reapplyEventsDataBlob, err := serializer.SerializeBatchEvents(
reapplyEvents,
common.EncodingTypeThriftRW,
)
if err != nil {
return err
}
// The active cluster of the domain is differ from the current cluster
// Use frontend client to route this request to the active cluster
// Reapplication only happens in active cluster
sourceCluster := clientBean.GetRemoteAdminClient(activeCluster)
if sourceCluster == nil {
return &types.InternalServiceError{
Message: fmt.Sprintf("cannot find cluster config %v to do reapply", activeCluster),
}
}
return sourceCluster.ReapplyEvents(
ctx,
&types.ReapplyEventsRequest{
DomainName: domainEntry.GetInfo().Name,
WorkflowExecution: execution,
Events: reapplyEventsDataBlob.ToInternal(),
},
)
}
func isOperationPossiblySuccessfulError(err error) bool {
switch err.(type) {
case nil:
return false
case *types.WorkflowExecutionAlreadyStartedError,
*persistence.WorkflowExecutionAlreadyStartedError,
*persistence.CurrentWorkflowConditionFailedError,
*persistence.ConditionFailedError,
*types.ServiceBusyError,
*types.LimitExceededError,
*persistence.ShardOwnershipLostError:
return false
case *persistence.TimeoutError:
return true
default:
return !IsConflictError(err)
}
}