in service/history/engine/engineimpl/historyEngine.go [633:865]
func (e *historyEngineImpl) startWorkflowHelper(
ctx context.Context,
startRequest *types.HistoryStartWorkflowExecutionRequest,
domainEntry *cache.DomainCacheEntry,
metricsScope int,
signalWithStartArg *signalWithStartArg,
) (resp *types.StartWorkflowExecutionResponse, retError error) {
if domainEntry.GetInfo().Status != persistence.DomainStatusRegistered {
return nil, errDomainDeprecated
}
request := startRequest.StartRequest
err := e.validateStartWorkflowExecutionRequest(request, metricsScope)
if err != nil {
return nil, err
}
e.overrideStartWorkflowExecutionRequest(domainEntry, request, metricsScope)
workflowID := request.GetWorkflowID()
domainID := domainEntry.GetInfo().ID
domain := domainEntry.GetInfo().Name
// grab the current context as a lock, nothing more
// use a smaller context timeout to get the lock
childCtx, childCancel := e.newChildContext(ctx)
defer childCancel()
_, currentRelease, err := e.executionCache.GetOrCreateCurrentWorkflowExecution(
childCtx,
domainID,
workflowID,
)
if err != nil {
if err == context.DeadlineExceeded {
return nil, workflow.ErrConcurrentStartRequest
}
return nil, err
}
defer func() { currentRelease(retError) }()
workflowExecution := types.WorkflowExecution{
WorkflowID: workflowID,
RunID: uuid.New(),
}
curMutableState, err := e.createMutableState(domainEntry, workflowExecution.GetRunID())
if err != nil {
return nil, err
}
// preprocess for signalWithStart
var prevMutableState execution.MutableState
var signalWithStartRequest *types.HistorySignalWithStartWorkflowExecutionRequest
isSignalWithStart := signalWithStartArg != nil
if isSignalWithStart {
prevMutableState = signalWithStartArg.prevMutableState
signalWithStartRequest = signalWithStartArg.signalWithStartRequest
}
if prevMutableState != nil {
prevLastWriteVersion, err := prevMutableState.GetLastWriteVersion()
if err != nil {
return nil, err
}
if prevLastWriteVersion > curMutableState.GetCurrentVersion() {
return nil, e.newDomainNotActiveError(
domainEntry.GetInfo().Name,
prevLastWriteVersion,
)
}
err = e.applyWorkflowIDReusePolicyForSigWithStart(
prevMutableState.GetExecutionInfo(),
workflowExecution,
request.GetWorkflowIDReusePolicy(),
)
if err != nil {
return nil, err
}
} else if e.shard.GetConfig().EnableRecordWorkflowExecutionUninitialized(domainEntry.GetInfo().Name) && e.visibilityMgr != nil {
uninitializedRequest := &persistence.RecordWorkflowExecutionUninitializedRequest{
DomainUUID: domainID,
Domain: domain,
Execution: types.WorkflowExecution{
WorkflowID: workflowID,
RunID: workflowExecution.RunID,
},
WorkflowTypeName: request.WorkflowType.Name,
UpdateTimestamp: e.shard.GetTimeSource().Now().UnixNano(),
ShardID: int64(e.shard.GetShardID()),
}
if err := e.visibilityMgr.RecordWorkflowExecutionUninitialized(ctx, uninitializedRequest); err != nil {
e.logger.Error("Failed to record uninitialized workflow execution", tag.Error(err))
}
}
err = e.addStartEventsAndTasks(
curMutableState,
workflowExecution,
startRequest,
signalWithStartRequest,
)
if err != nil {
if e.shard.GetConfig().EnableRecordWorkflowExecutionUninitialized(domainEntry.GetInfo().Name) && e.visibilityMgr != nil {
// delete the uninitialized workflow execution record since it failed to start the workflow
// uninitialized record is used to find wfs that didn't make a progress or stuck during the start process
if errVisibility := e.visibilityMgr.DeleteWorkflowExecution(ctx, &persistence.VisibilityDeleteWorkflowExecutionRequest{
DomainID: domainID,
Domain: domain,
RunID: workflowExecution.RunID,
WorkflowID: workflowID,
}); errVisibility != nil {
e.logger.Error("Failed to delete uninitialized workflow execution record", tag.Error(errVisibility))
}
}
return nil, err
}
wfContext := execution.NewContext(domainID, workflowExecution, e.shard, e.executionManager, e.logger)
newWorkflow, newWorkflowEventsSeq, err := curMutableState.CloseTransactionAsSnapshot(
e.timeSource.Now(),
execution.TransactionPolicyActive,
)
if err != nil {
return nil, err
}
historyBlob, err := wfContext.PersistStartWorkflowBatchEvents(ctx, newWorkflowEventsSeq[0])
if err != nil {
return nil, err
}
// create as brand new
createMode := persistence.CreateWorkflowModeBrandNew
prevRunID := ""
prevLastWriteVersion := int64(0)
// overwrite in case of signalWithStart
if prevMutableState != nil {
createMode = persistence.CreateWorkflowModeWorkflowIDReuse
info := prevMutableState.GetExecutionInfo()
// For corrupted workflows use ContinueAsNew mode.
// WorkflowIDReuse mode require workflows to be in completed state, which is not necessarily true for corrupted workflows.
if info.State == persistence.WorkflowStateCorrupted {
createMode = persistence.CreateWorkflowModeContinueAsNew
}
prevRunID = info.RunID
prevLastWriteVersion, err = prevMutableState.GetLastWriteVersion()
if err != nil {
return nil, err
}
}
err = wfContext.CreateWorkflowExecution(
ctx,
newWorkflow,
historyBlob,
createMode,
prevRunID,
prevLastWriteVersion,
)
// handle already started error
if t, ok := err.(*persistence.WorkflowExecutionAlreadyStartedError); ok {
if t.StartRequestID == request.GetRequestID() {
return &types.StartWorkflowExecutionResponse{
RunID: t.RunID,
}, nil
}
if isSignalWithStart {
return nil, err
}
if curMutableState.GetCurrentVersion() < t.LastWriteVersion {
return nil, e.newDomainNotActiveError(
domainEntry.GetInfo().Name,
t.LastWriteVersion,
)
}
prevRunID = t.RunID
if shouldTerminateAndStart(startRequest, t.State) {
runningWFCtx, err := workflow.LoadOnce(ctx, e.executionCache, domainID, workflowID, prevRunID)
if err != nil {
return nil, err
}
defer func() { runningWFCtx.GetReleaseFn()(retError) }()
resp, err = e.terminateAndStartWorkflow(
ctx,
runningWFCtx,
workflowExecution,
domainEntry,
domainID,
startRequest,
nil,
)
switch err.(type) {
// By the time we try to terminate the workflow, it was already terminated
// So continue as if we didn't need to terminate it in the first place
case *types.WorkflowExecutionAlreadyCompletedError:
e.shard.GetLogger().Warn("Workflow completed while trying to terminate, will continue starting workflow", tag.Error(err))
default:
return resp, err
}
}
if err = e.applyWorkflowIDReusePolicyHelper(
t.StartRequestID,
prevRunID,
t.State,
t.CloseStatus,
workflowExecution,
startRequest.StartRequest.GetWorkflowIDReusePolicy(),
); err != nil {
return nil, err
}
// create as ID reuse
createMode = persistence.CreateWorkflowModeWorkflowIDReuse
err = wfContext.CreateWorkflowExecution(
ctx,
newWorkflow,
historyBlob,
createMode,
prevRunID,
t.LastWriteVersion,
)
}
if err != nil {
return nil, err
}
return &types.StartWorkflowExecutionResponse{
RunID: workflowExecution.RunID,
}, nil
}