in service/history/engine/engineimpl/historyEngine.go [2513:2657]
func (e *historyEngineImpl) SignalWithStartWorkflowExecution(
ctx context.Context,
signalWithStartRequest *types.HistorySignalWithStartWorkflowExecutionRequest,
) (retResp *types.StartWorkflowExecutionResponse, retError error) {
domainEntry, err := e.getActiveDomainByID(signalWithStartRequest.DomainUUID)
if err != nil {
return nil, err
}
if domainEntry.GetInfo().Status != persistence.DomainStatusRegistered {
return nil, errDomainDeprecated
}
domainID := domainEntry.GetInfo().ID
sRequest := signalWithStartRequest.SignalWithStartRequest
workflowExecution := types.WorkflowExecution{
WorkflowID: sRequest.WorkflowID,
}
var prevMutableState execution.MutableState
attempt := 0
wfContext, release, err0 := e.executionCache.GetOrCreateWorkflowExecution(ctx, domainID, workflowExecution)
if err0 == nil {
defer func() { release(retError) }()
Just_Signal_Loop:
for ; attempt < workflow.ConditionalRetryCount; attempt++ {
// workflow not exist, will create workflow then signal
mutableState, err1 := wfContext.LoadWorkflowExecution(ctx)
if err1 != nil {
if _, ok := err1.(*types.EntityNotExistsError); ok {
break
}
return nil, err1
}
if mutableState.IsSignalRequested(sRequest.GetRequestID()) {
return &types.StartWorkflowExecutionResponse{RunID: wfContext.GetExecution().RunID}, nil
}
// workflow exist but not running, will restart workflow then signal
if !mutableState.IsWorkflowExecutionRunning() {
prevMutableState = mutableState
break
}
// workflow exists but history is corrupted, will restart workflow then signal
if corrupted, err := e.checkForHistoryCorruptions(ctx, mutableState); err != nil {
return nil, err
} else if corrupted {
prevMutableState = mutableState
break
}
// workflow is running, if policy is TerminateIfRunning, terminate current run then signalWithStart
if sRequest.GetWorkflowIDReusePolicy() == types.WorkflowIDReusePolicyTerminateIfRunning {
workflowExecution.RunID = uuid.New()
runningWFCtx := workflow.NewContext(wfContext, release, mutableState)
resp, errTerm := e.terminateAndStartWorkflow(
ctx,
runningWFCtx,
workflowExecution,
domainEntry,
domainID,
nil,
signalWithStartRequest,
)
// By the time we try to terminate the workflow, it was already terminated
// So continue as if we didn't need to terminate it in the first place
if _, ok := errTerm.(*types.WorkflowExecutionAlreadyCompletedError); !ok {
return resp, errTerm
}
}
executionInfo := mutableState.GetExecutionInfo()
maxAllowedSignals := e.config.MaximumSignalsPerExecution(domainEntry.GetInfo().Name)
if maxAllowedSignals > 0 && int(executionInfo.SignalCount) >= maxAllowedSignals {
e.logger.Info("Execution limit reached for maximum signals", tag.WorkflowSignalCount(executionInfo.SignalCount),
tag.WorkflowID(workflowExecution.GetWorkflowID()),
tag.WorkflowRunID(workflowExecution.GetRunID()),
tag.WorkflowDomainID(domainID))
return nil, workflow.ErrSignalsLimitExceeded
}
requestID := sRequest.GetRequestID()
if requestID != "" {
mutableState.AddSignalRequested(requestID)
}
if _, err := mutableState.AddWorkflowExecutionSignaled(
sRequest.GetSignalName(),
sRequest.GetSignalInput(),
sRequest.GetIdentity(),
sRequest.GetRequestID(),
); err != nil {
return nil, &types.InternalServiceError{Message: "Unable to signal workflow execution."}
}
// Create a transfer task to schedule a decision task
if !mutableState.HasPendingDecision() {
_, err := mutableState.AddDecisionTaskScheduledEvent(false)
if err != nil {
return nil, &types.InternalServiceError{Message: "Failed to add decision scheduled event."}
}
}
// We apply the update to execution using optimistic concurrency. If it fails due to a conflict then reload
// the history and try the operation again.
if err := wfContext.UpdateWorkflowExecutionAsActive(ctx, e.shard.GetTimeSource().Now()); err != nil {
if execution.IsConflictError(err) {
continue Just_Signal_Loop
}
return nil, err
}
return &types.StartWorkflowExecutionResponse{RunID: wfContext.GetExecution().RunID}, nil
} // end for Just_Signal_Loop
if attempt == workflow.ConditionalRetryCount {
return nil, workflow.ErrMaxAttemptsExceeded
}
} else {
if _, ok := err0.(*types.EntityNotExistsError); !ok {
return nil, err0
}
// workflow not exist, will create workflow then signal
}
// Start workflow and signal
startRequest, err := getStartRequest(domainID, sRequest, signalWithStartRequest.PartitionConfig)
if err != nil {
return nil, err
}
sigWithStartArg := &signalWithStartArg{
signalWithStartRequest: signalWithStartRequest,
prevMutableState: prevMutableState,
}
return e.startWorkflowHelper(
ctx,
startRequest,
domainEntry,
metrics.HistorySignalWithStartWorkflowExecutionScope,
sigWithStartArg,
)
}