func()

in service/history/task/transfer_active_task_executor.go [354:569]


func (t *transferActiveTaskExecutor) processCloseExecutionTaskHelper(
	ctx context.Context,
	task *persistence.TransferTaskInfo,
	recordWorkflowClosed bool,
	replyToParentWorkflow bool,
	applyParentClosePolicy bool,
) (retError error) {

	wfContext, release, err := t.executionCache.GetOrCreateWorkflowExecutionWithTimeout(
		task.DomainID,
		getWorkflowExecution(task),
		taskGetExecutionContextTimeout,
	)
	if err != nil {
		if err == context.DeadlineExceeded {
			return errWorkflowBusy
		}
		return err
	}
	defer func() { release(retError) }()

	mutableState, err := loadMutableStateForTransferTask(ctx, wfContext, task, t.metricsClient, t.logger)
	if err != nil {
		return err
	}
	if mutableState == nil || mutableState.IsWorkflowExecutionRunning() {
		return nil
	}

	lastWriteVersion, err := mutableState.GetLastWriteVersion()
	if err != nil {
		return err
	}
	ok, err := verifyTaskVersion(t.shard, t.logger, task.DomainID, lastWriteVersion, task.Version, task)
	if err != nil || !ok {
		return err
	}

	domainEntry, err := t.shard.GetDomainCache().GetDomainByID(task.DomainID)
	if err != nil {
		return err
	}

	executionInfo := mutableState.GetExecutionInfo()
	completionEvent, err := mutableState.GetCompletionEvent(ctx)
	if err != nil {
		return err
	}
	wfCloseTime := completionEvent.GetTimestamp()

	parentDomainID := executionInfo.ParentDomainID
	parentWorkflowID := executionInfo.ParentWorkflowID
	parentRunID := executionInfo.ParentRunID
	initiatedID := executionInfo.InitiatedID

	workflowTypeName := executionInfo.WorkflowTypeName
	workflowCloseTimestamp := wfCloseTime
	workflowCloseStatus := persistence.ToInternalWorkflowExecutionCloseStatus(executionInfo.CloseStatus)
	workflowHistoryLength := mutableState.GetNextEventID() - 1
	isCron := len(executionInfo.CronSchedule) > 0
	numClusters := (int16)(len(domainEntry.GetReplicationConfig().Clusters))
	updateTimestamp := t.shard.GetTimeSource().Now()

	startEvent, err := mutableState.GetStartEvent(ctx)
	if err != nil {
		return err
	}
	workflowStartTimestamp := startEvent.GetTimestamp()
	workflowExecutionTimestamp := getWorkflowExecutionTimestamp(mutableState, startEvent)
	visibilityMemo := getWorkflowMemo(executionInfo.Memo)
	searchAttr := executionInfo.SearchAttributes
	domainName := mutableState.GetDomainEntry().GetInfo().Name
	children, err := filterPendingChildExecutions(
		task.TargetDomainIDs,
		mutableState.GetPendingChildExecutionInfos(),
		t.shard.GetDomainCache(),
		domainEntry,
	)
	if err != nil {
		return err
	}

	// generate cross cluster task for applying parent close policy
	var crossClusterTaskGenerators []generatorF
	var sameClusterChildDomainIDs map[int64]string
	var signalParentClosePolicyWorker bool
	if applyParentClosePolicy {
		crossClusterTaskGenerators,
			sameClusterChildDomainIDs,
			signalParentClosePolicyWorker,
			err = t.applyParentClosePolicyDomainActiveCheck(task, domainName, children, domainEntry)
		if err != nil {
			return err
		}
	}

	// generate cross cluster task for record child execution completed
	replyToParentWorkflow = replyToParentWorkflow &&
		mutableState.HasParentExecution() &&
		executionInfo.CloseStatus != persistence.WorkflowCloseStatusContinuedAsNew
	if replyToParentWorkflow {
		// generate cross cluster task for recording child completion
		targetDomainEntry, err := t.shard.GetDomainCache().GetDomainByID(parentDomainID)
		if err != nil {
			return err
		}
		if targetCluster, isCrossCluster := t.isCrossClusterTask(task.DomainID, targetDomainEntry); isCrossCluster {
			parentInfo := &types.ParentExecutionInfo{
				DomainUUID: parentDomainID,
				Domain:     targetDomainEntry.GetInfo().Name,
				Execution: &types.WorkflowExecution{
					WorkflowID: parentWorkflowID,
					RunID:      parentRunID,
				},
				InitiatedID: initiatedID,
			}
			crossClusterTaskGenerators = append(crossClusterTaskGenerators,
				func(taskGenerator execution.MutableStateTaskGenerator) error {
					return taskGenerator.GenerateCrossClusterRecordChildCompletedTask(task, targetCluster, parentInfo)
				})
			replyToParentWorkflow = false
		}
	}

	// update workflow execute to persist generated cross cluster tasks
	if len(crossClusterTaskGenerators) > 0 {
		if err := t.generateCrossClusterTasks(
			ctx,
			wfContext,
			mutableState,
			task,
			crossClusterTaskGenerators,
		); err != nil {
			return err
		}
	}

	// we've gathered all necessary information from mutable state and
	// generated/persisted necessary cross cluster tasks.
	// release the context lock since we no longer need mutable state builder and
	// the rest of logic is making RPC call, which takes time.
	release(nil)

	// publish workflow closed visibility records
	if recordWorkflowClosed {
		if err := t.recordWorkflowClosed(
			ctx,
			task.DomainID,
			task.WorkflowID,
			task.RunID,
			workflowTypeName,
			workflowStartTimestamp,
			workflowExecutionTimestamp.UnixNano(),
			workflowCloseTimestamp,
			*workflowCloseStatus,
			workflowHistoryLength,
			task.GetTaskID(),
			visibilityMemo,
			executionInfo.TaskList,
			isCron,
			numClusters,
			updateTimestamp.UnixNano(),
			searchAttr,
		); err != nil {
			return err
		}
	}

	// Communicate the result to parent execution if this is Child Workflow execution
	// and parent domain is in the same cluster
	if replyToParentWorkflow {
		recordChildCompletionCtx, cancel := context.WithTimeout(ctx, taskRPCCallTimeout)
		defer cancel()
		err := t.historyClient.RecordChildExecutionCompleted(recordChildCompletionCtx, &types.RecordChildExecutionCompletedRequest{
			DomainUUID: parentDomainID,
			WorkflowExecution: &types.WorkflowExecution{
				WorkflowID: parentWorkflowID,
				RunID:      parentRunID,
			},
			InitiatedID: initiatedID,
			CompletedExecution: &types.WorkflowExecution{
				WorkflowID: task.WorkflowID,
				RunID:      task.RunID,
			},
			CompletionEvent: completionEvent,
		})

		// Check to see if the error is non-transient, in which case reset the error and continue with processing
		switch err.(type) {
		case *types.EntityNotExistsError, *types.WorkflowExecutionAlreadyCompletedError:
			err = nil
		case *types.DomainNotActiveError:
			err = errTargetDomainNotActive
		}

		if err != nil {
			return err
		}
	}

	if applyParentClosePolicy {
		if err := t.processParentClosePolicy(
			ctx,
			wfContext,
			task,
			children,
			sameClusterChildDomainIDs,
			signalParentClosePolicyWorker,
			domainEntry,
		); err != nil {
			return err
		}
	}

	return nil
}