func()

in service/history/engine/engineimpl/historyEngine.go [2513:2657]


func (e *historyEngineImpl) SignalWithStartWorkflowExecution(
	ctx context.Context,
	signalWithStartRequest *types.HistorySignalWithStartWorkflowExecutionRequest,
) (retResp *types.StartWorkflowExecutionResponse, retError error) {

	domainEntry, err := e.getActiveDomainByID(signalWithStartRequest.DomainUUID)
	if err != nil {
		return nil, err
	}
	if domainEntry.GetInfo().Status != persistence.DomainStatusRegistered {
		return nil, errDomainDeprecated
	}
	domainID := domainEntry.GetInfo().ID

	sRequest := signalWithStartRequest.SignalWithStartRequest
	workflowExecution := types.WorkflowExecution{
		WorkflowID: sRequest.WorkflowID,
	}

	var prevMutableState execution.MutableState
	attempt := 0

	wfContext, release, err0 := e.executionCache.GetOrCreateWorkflowExecution(ctx, domainID, workflowExecution)

	if err0 == nil {
		defer func() { release(retError) }()
	Just_Signal_Loop:
		for ; attempt < workflow.ConditionalRetryCount; attempt++ {
			// workflow not exist, will create workflow then signal
			mutableState, err1 := wfContext.LoadWorkflowExecution(ctx)
			if err1 != nil {
				if _, ok := err1.(*types.EntityNotExistsError); ok {
					break
				}
				return nil, err1
			}

			if mutableState.IsSignalRequested(sRequest.GetRequestID()) {
				return &types.StartWorkflowExecutionResponse{RunID: wfContext.GetExecution().RunID}, nil
			}

			// workflow exist but not running, will restart workflow then signal
			if !mutableState.IsWorkflowExecutionRunning() {
				prevMutableState = mutableState
				break
			}

			// workflow exists but history is corrupted, will restart workflow then signal
			if corrupted, err := e.checkForHistoryCorruptions(ctx, mutableState); err != nil {
				return nil, err
			} else if corrupted {
				prevMutableState = mutableState
				break
			}

			// workflow is running, if policy is TerminateIfRunning, terminate current run then signalWithStart
			if sRequest.GetWorkflowIDReusePolicy() == types.WorkflowIDReusePolicyTerminateIfRunning {
				workflowExecution.RunID = uuid.New()
				runningWFCtx := workflow.NewContext(wfContext, release, mutableState)
				resp, errTerm := e.terminateAndStartWorkflow(
					ctx,
					runningWFCtx,
					workflowExecution,
					domainEntry,
					domainID,
					nil,
					signalWithStartRequest,
				)
				// By the time we try to terminate the workflow, it was already terminated
				// So continue as if we didn't need to terminate it in the first place
				if _, ok := errTerm.(*types.WorkflowExecutionAlreadyCompletedError); !ok {
					return resp, errTerm
				}
			}

			executionInfo := mutableState.GetExecutionInfo()
			maxAllowedSignals := e.config.MaximumSignalsPerExecution(domainEntry.GetInfo().Name)
			if maxAllowedSignals > 0 && int(executionInfo.SignalCount) >= maxAllowedSignals {
				e.logger.Info("Execution limit reached for maximum signals", tag.WorkflowSignalCount(executionInfo.SignalCount),
					tag.WorkflowID(workflowExecution.GetWorkflowID()),
					tag.WorkflowRunID(workflowExecution.GetRunID()),
					tag.WorkflowDomainID(domainID))
				return nil, workflow.ErrSignalsLimitExceeded
			}

			requestID := sRequest.GetRequestID()
			if requestID != "" {
				mutableState.AddSignalRequested(requestID)
			}

			if _, err := mutableState.AddWorkflowExecutionSignaled(
				sRequest.GetSignalName(),
				sRequest.GetSignalInput(),
				sRequest.GetIdentity(),
				sRequest.GetRequestID(),
			); err != nil {
				return nil, &types.InternalServiceError{Message: "Unable to signal workflow execution."}
			}

			// Create a transfer task to schedule a decision task
			if !mutableState.HasPendingDecision() {
				_, err := mutableState.AddDecisionTaskScheduledEvent(false)
				if err != nil {
					return nil, &types.InternalServiceError{Message: "Failed to add decision scheduled event."}
				}
			}

			// We apply the update to execution using optimistic concurrency.  If it fails due to a conflict then reload
			// the history and try the operation again.
			if err := wfContext.UpdateWorkflowExecutionAsActive(ctx, e.shard.GetTimeSource().Now()); err != nil {
				if execution.IsConflictError(err) {
					continue Just_Signal_Loop
				}
				return nil, err
			}
			return &types.StartWorkflowExecutionResponse{RunID: wfContext.GetExecution().RunID}, nil
		} // end for Just_Signal_Loop
		if attempt == workflow.ConditionalRetryCount {
			return nil, workflow.ErrMaxAttemptsExceeded
		}
	} else {
		if _, ok := err0.(*types.EntityNotExistsError); !ok {
			return nil, err0
		}
		// workflow not exist, will create workflow then signal
	}

	// Start workflow and signal
	startRequest, err := getStartRequest(domainID, sRequest, signalWithStartRequest.PartitionConfig)
	if err != nil {
		return nil, err
	}

	sigWithStartArg := &signalWithStartArg{
		signalWithStartRequest: signalWithStartRequest,
		prevMutableState:       prevMutableState,
	}
	return e.startWorkflowHelper(
		ctx,
		startRequest,
		domainEntry,
		metrics.HistorySignalWithStartWorkflowExecutionScope,
		sigWithStartArg,
	)
}