func()

in service/history/engine/engineimpl/historyEngine.go [633:865]


func (e *historyEngineImpl) startWorkflowHelper(
	ctx context.Context,
	startRequest *types.HistoryStartWorkflowExecutionRequest,
	domainEntry *cache.DomainCacheEntry,
	metricsScope int,
	signalWithStartArg *signalWithStartArg,
) (resp *types.StartWorkflowExecutionResponse, retError error) {

	if domainEntry.GetInfo().Status != persistence.DomainStatusRegistered {
		return nil, errDomainDeprecated
	}

	request := startRequest.StartRequest
	err := e.validateStartWorkflowExecutionRequest(request, metricsScope)
	if err != nil {
		return nil, err
	}
	e.overrideStartWorkflowExecutionRequest(domainEntry, request, metricsScope)

	workflowID := request.GetWorkflowID()
	domainID := domainEntry.GetInfo().ID
	domain := domainEntry.GetInfo().Name

	// grab the current context as a lock, nothing more
	// use a smaller context timeout to get the lock
	childCtx, childCancel := e.newChildContext(ctx)
	defer childCancel()

	_, currentRelease, err := e.executionCache.GetOrCreateCurrentWorkflowExecution(
		childCtx,
		domainID,
		workflowID,
	)
	if err != nil {
		if err == context.DeadlineExceeded {
			return nil, workflow.ErrConcurrentStartRequest
		}
		return nil, err
	}
	defer func() { currentRelease(retError) }()

	workflowExecution := types.WorkflowExecution{
		WorkflowID: workflowID,
		RunID:      uuid.New(),
	}
	curMutableState, err := e.createMutableState(domainEntry, workflowExecution.GetRunID())
	if err != nil {
		return nil, err
	}

	// preprocess for signalWithStart
	var prevMutableState execution.MutableState
	var signalWithStartRequest *types.HistorySignalWithStartWorkflowExecutionRequest
	isSignalWithStart := signalWithStartArg != nil
	if isSignalWithStart {
		prevMutableState = signalWithStartArg.prevMutableState
		signalWithStartRequest = signalWithStartArg.signalWithStartRequest
	}
	if prevMutableState != nil {
		prevLastWriteVersion, err := prevMutableState.GetLastWriteVersion()
		if err != nil {
			return nil, err
		}
		if prevLastWriteVersion > curMutableState.GetCurrentVersion() {
			return nil, e.newDomainNotActiveError(
				domainEntry.GetInfo().Name,
				prevLastWriteVersion,
			)
		}
		err = e.applyWorkflowIDReusePolicyForSigWithStart(
			prevMutableState.GetExecutionInfo(),
			workflowExecution,
			request.GetWorkflowIDReusePolicy(),
		)
		if err != nil {
			return nil, err
		}
	} else if e.shard.GetConfig().EnableRecordWorkflowExecutionUninitialized(domainEntry.GetInfo().Name) && e.visibilityMgr != nil {
		uninitializedRequest := &persistence.RecordWorkflowExecutionUninitializedRequest{
			DomainUUID: domainID,
			Domain:     domain,
			Execution: types.WorkflowExecution{
				WorkflowID: workflowID,
				RunID:      workflowExecution.RunID,
			},
			WorkflowTypeName: request.WorkflowType.Name,
			UpdateTimestamp:  e.shard.GetTimeSource().Now().UnixNano(),
			ShardID:          int64(e.shard.GetShardID()),
		}

		if err := e.visibilityMgr.RecordWorkflowExecutionUninitialized(ctx, uninitializedRequest); err != nil {
			e.logger.Error("Failed to record uninitialized workflow execution", tag.Error(err))
		}
	}

	err = e.addStartEventsAndTasks(
		curMutableState,
		workflowExecution,
		startRequest,
		signalWithStartRequest,
	)
	if err != nil {
		if e.shard.GetConfig().EnableRecordWorkflowExecutionUninitialized(domainEntry.GetInfo().Name) && e.visibilityMgr != nil {
			// delete the uninitialized workflow execution record since it failed to start the workflow
			// uninitialized record is used to find wfs that didn't make a progress or stuck during the start process
			if errVisibility := e.visibilityMgr.DeleteWorkflowExecution(ctx, &persistence.VisibilityDeleteWorkflowExecutionRequest{
				DomainID:   domainID,
				Domain:     domain,
				RunID:      workflowExecution.RunID,
				WorkflowID: workflowID,
			}); errVisibility != nil {
				e.logger.Error("Failed to delete uninitialized workflow execution record", tag.Error(errVisibility))
			}
		}

		return nil, err
	}
	wfContext := execution.NewContext(domainID, workflowExecution, e.shard, e.executionManager, e.logger)

	newWorkflow, newWorkflowEventsSeq, err := curMutableState.CloseTransactionAsSnapshot(
		e.timeSource.Now(),
		execution.TransactionPolicyActive,
	)
	if err != nil {
		return nil, err
	}
	historyBlob, err := wfContext.PersistStartWorkflowBatchEvents(ctx, newWorkflowEventsSeq[0])
	if err != nil {
		return nil, err
	}

	// create as brand new
	createMode := persistence.CreateWorkflowModeBrandNew
	prevRunID := ""
	prevLastWriteVersion := int64(0)
	// overwrite in case of signalWithStart
	if prevMutableState != nil {
		createMode = persistence.CreateWorkflowModeWorkflowIDReuse
		info := prevMutableState.GetExecutionInfo()
		// For corrupted workflows use ContinueAsNew mode.
		// WorkflowIDReuse mode require workflows to be in completed state, which is not necessarily true for corrupted workflows.
		if info.State == persistence.WorkflowStateCorrupted {
			createMode = persistence.CreateWorkflowModeContinueAsNew
		}
		prevRunID = info.RunID
		prevLastWriteVersion, err = prevMutableState.GetLastWriteVersion()
		if err != nil {
			return nil, err
		}
	}
	err = wfContext.CreateWorkflowExecution(
		ctx,
		newWorkflow,
		historyBlob,
		createMode,
		prevRunID,
		prevLastWriteVersion,
	)
	// handle already started error
	if t, ok := err.(*persistence.WorkflowExecutionAlreadyStartedError); ok {

		if t.StartRequestID == request.GetRequestID() {
			return &types.StartWorkflowExecutionResponse{
				RunID: t.RunID,
			}, nil
		}

		if isSignalWithStart {
			return nil, err
		}

		if curMutableState.GetCurrentVersion() < t.LastWriteVersion {
			return nil, e.newDomainNotActiveError(
				domainEntry.GetInfo().Name,
				t.LastWriteVersion,
			)
		}

		prevRunID = t.RunID
		if shouldTerminateAndStart(startRequest, t.State) {
			runningWFCtx, err := workflow.LoadOnce(ctx, e.executionCache, domainID, workflowID, prevRunID)
			if err != nil {
				return nil, err
			}
			defer func() { runningWFCtx.GetReleaseFn()(retError) }()

			resp, err = e.terminateAndStartWorkflow(
				ctx,
				runningWFCtx,
				workflowExecution,
				domainEntry,
				domainID,
				startRequest,
				nil,
			)
			switch err.(type) {
			// By the time we try to terminate the workflow, it was already terminated
			// So continue as if we didn't need to terminate it in the first place
			case *types.WorkflowExecutionAlreadyCompletedError:
				e.shard.GetLogger().Warn("Workflow completed while trying to terminate, will continue starting workflow", tag.Error(err))
			default:
				return resp, err
			}
		}
		if err = e.applyWorkflowIDReusePolicyHelper(
			t.StartRequestID,
			prevRunID,
			t.State,
			t.CloseStatus,
			workflowExecution,
			startRequest.StartRequest.GetWorkflowIDReusePolicy(),
		); err != nil {
			return nil, err
		}
		// create as ID reuse
		createMode = persistence.CreateWorkflowModeWorkflowIDReuse
		err = wfContext.CreateWorkflowExecution(
			ctx,
			newWorkflow,
			historyBlob,
			createMode,
			prevRunID,
			t.LastWriteVersion,
		)
	}
	if err != nil {
		return nil, err
	}

	return &types.StartWorkflowExecutionResponse{
		RunID: workflowExecution.RunID,
	}, nil
}