in service/history/task/transfer_active_task_executor.go [354:569]
func (t *transferActiveTaskExecutor) processCloseExecutionTaskHelper(
ctx context.Context,
task *persistence.TransferTaskInfo,
recordWorkflowClosed bool,
replyToParentWorkflow bool,
applyParentClosePolicy bool,
) (retError error) {
wfContext, release, err := t.executionCache.GetOrCreateWorkflowExecutionWithTimeout(
task.DomainID,
getWorkflowExecution(task),
taskGetExecutionContextTimeout,
)
if err != nil {
if err == context.DeadlineExceeded {
return errWorkflowBusy
}
return err
}
defer func() { release(retError) }()
mutableState, err := loadMutableStateForTransferTask(ctx, wfContext, task, t.metricsClient, t.logger)
if err != nil {
return err
}
if mutableState == nil || mutableState.IsWorkflowExecutionRunning() {
return nil
}
lastWriteVersion, err := mutableState.GetLastWriteVersion()
if err != nil {
return err
}
ok, err := verifyTaskVersion(t.shard, t.logger, task.DomainID, lastWriteVersion, task.Version, task)
if err != nil || !ok {
return err
}
domainEntry, err := t.shard.GetDomainCache().GetDomainByID(task.DomainID)
if err != nil {
return err
}
executionInfo := mutableState.GetExecutionInfo()
completionEvent, err := mutableState.GetCompletionEvent(ctx)
if err != nil {
return err
}
wfCloseTime := completionEvent.GetTimestamp()
parentDomainID := executionInfo.ParentDomainID
parentWorkflowID := executionInfo.ParentWorkflowID
parentRunID := executionInfo.ParentRunID
initiatedID := executionInfo.InitiatedID
workflowTypeName := executionInfo.WorkflowTypeName
workflowCloseTimestamp := wfCloseTime
workflowCloseStatus := persistence.ToInternalWorkflowExecutionCloseStatus(executionInfo.CloseStatus)
workflowHistoryLength := mutableState.GetNextEventID() - 1
isCron := len(executionInfo.CronSchedule) > 0
numClusters := (int16)(len(domainEntry.GetReplicationConfig().Clusters))
updateTimestamp := t.shard.GetTimeSource().Now()
startEvent, err := mutableState.GetStartEvent(ctx)
if err != nil {
return err
}
workflowStartTimestamp := startEvent.GetTimestamp()
workflowExecutionTimestamp := getWorkflowExecutionTimestamp(mutableState, startEvent)
visibilityMemo := getWorkflowMemo(executionInfo.Memo)
searchAttr := executionInfo.SearchAttributes
domainName := mutableState.GetDomainEntry().GetInfo().Name
children, err := filterPendingChildExecutions(
task.TargetDomainIDs,
mutableState.GetPendingChildExecutionInfos(),
t.shard.GetDomainCache(),
domainEntry,
)
if err != nil {
return err
}
// generate cross cluster task for applying parent close policy
var crossClusterTaskGenerators []generatorF
var sameClusterChildDomainIDs map[int64]string
var signalParentClosePolicyWorker bool
if applyParentClosePolicy {
crossClusterTaskGenerators,
sameClusterChildDomainIDs,
signalParentClosePolicyWorker,
err = t.applyParentClosePolicyDomainActiveCheck(task, domainName, children, domainEntry)
if err != nil {
return err
}
}
// generate cross cluster task for record child execution completed
replyToParentWorkflow = replyToParentWorkflow &&
mutableState.HasParentExecution() &&
executionInfo.CloseStatus != persistence.WorkflowCloseStatusContinuedAsNew
if replyToParentWorkflow {
// generate cross cluster task for recording child completion
targetDomainEntry, err := t.shard.GetDomainCache().GetDomainByID(parentDomainID)
if err != nil {
return err
}
if targetCluster, isCrossCluster := t.isCrossClusterTask(task.DomainID, targetDomainEntry); isCrossCluster {
parentInfo := &types.ParentExecutionInfo{
DomainUUID: parentDomainID,
Domain: targetDomainEntry.GetInfo().Name,
Execution: &types.WorkflowExecution{
WorkflowID: parentWorkflowID,
RunID: parentRunID,
},
InitiatedID: initiatedID,
}
crossClusterTaskGenerators = append(crossClusterTaskGenerators,
func(taskGenerator execution.MutableStateTaskGenerator) error {
return taskGenerator.GenerateCrossClusterRecordChildCompletedTask(task, targetCluster, parentInfo)
})
replyToParentWorkflow = false
}
}
// update workflow execute to persist generated cross cluster tasks
if len(crossClusterTaskGenerators) > 0 {
if err := t.generateCrossClusterTasks(
ctx,
wfContext,
mutableState,
task,
crossClusterTaskGenerators,
); err != nil {
return err
}
}
// we've gathered all necessary information from mutable state and
// generated/persisted necessary cross cluster tasks.
// release the context lock since we no longer need mutable state builder and
// the rest of logic is making RPC call, which takes time.
release(nil)
// publish workflow closed visibility records
if recordWorkflowClosed {
if err := t.recordWorkflowClosed(
ctx,
task.DomainID,
task.WorkflowID,
task.RunID,
workflowTypeName,
workflowStartTimestamp,
workflowExecutionTimestamp.UnixNano(),
workflowCloseTimestamp,
*workflowCloseStatus,
workflowHistoryLength,
task.GetTaskID(),
visibilityMemo,
executionInfo.TaskList,
isCron,
numClusters,
updateTimestamp.UnixNano(),
searchAttr,
); err != nil {
return err
}
}
// Communicate the result to parent execution if this is Child Workflow execution
// and parent domain is in the same cluster
if replyToParentWorkflow {
recordChildCompletionCtx, cancel := context.WithTimeout(ctx, taskRPCCallTimeout)
defer cancel()
err := t.historyClient.RecordChildExecutionCompleted(recordChildCompletionCtx, &types.RecordChildExecutionCompletedRequest{
DomainUUID: parentDomainID,
WorkflowExecution: &types.WorkflowExecution{
WorkflowID: parentWorkflowID,
RunID: parentRunID,
},
InitiatedID: initiatedID,
CompletedExecution: &types.WorkflowExecution{
WorkflowID: task.WorkflowID,
RunID: task.RunID,
},
CompletionEvent: completionEvent,
})
// Check to see if the error is non-transient, in which case reset the error and continue with processing
switch err.(type) {
case *types.EntityNotExistsError, *types.WorkflowExecutionAlreadyCompletedError:
err = nil
case *types.DomainNotActiveError:
err = errTargetDomainNotActive
}
if err != nil {
return err
}
}
if applyParentClosePolicy {
if err := t.processParentClosePolicy(
ctx,
wfContext,
task,
children,
sameClusterChildDomainIDs,
signalParentClosePolicyWorker,
domainEntry,
); err != nil {
return err
}
}
return nil
}