func()

in service/history/engine/engineimpl/historyEngine.go [1615:1793]


func (e *historyEngineImpl) DescribeWorkflowExecution(
	ctx context.Context,
	request *types.HistoryDescribeWorkflowExecutionRequest,
) (retResp *types.DescribeWorkflowExecutionResponse, retError error) {

	if err := common.ValidateDomainUUID(request.DomainUUID); err != nil {
		return nil, err
	}

	domainID := request.DomainUUID
	wfExecution := *request.Request.Execution

	wfContext, release, err0 := e.executionCache.GetOrCreateWorkflowExecution(ctx, domainID, wfExecution)
	if err0 != nil {
		return nil, err0
	}
	defer func() { release(retError) }()

	mutableState, err1 := wfContext.LoadWorkflowExecution(ctx)
	if err1 != nil {
		return nil, err1
	}
	// If history is corrupted, return an error to the end user
	if corrupted, err := e.checkForHistoryCorruptions(ctx, mutableState); err != nil {
		return nil, err
	} else if corrupted {
		return nil, &types.EntityNotExistsError{Message: "Workflow execution corrupted."}
	}

	executionInfo := mutableState.GetExecutionInfo()

	result := &types.DescribeWorkflowExecutionResponse{
		ExecutionConfiguration: &types.WorkflowExecutionConfiguration{
			TaskList:                            &types.TaskList{Name: executionInfo.TaskList},
			ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(executionInfo.WorkflowTimeout),
			TaskStartToCloseTimeoutSeconds:      common.Int32Ptr(executionInfo.DecisionStartToCloseTimeout),
		},
		WorkflowExecutionInfo: &types.WorkflowExecutionInfo{
			Execution: &types.WorkflowExecution{
				WorkflowID: executionInfo.WorkflowID,
				RunID:      executionInfo.RunID,
			},
			Type:             &types.WorkflowType{Name: executionInfo.WorkflowTypeName},
			StartTime:        common.Int64Ptr(executionInfo.StartTimestamp.UnixNano()),
			HistoryLength:    mutableState.GetNextEventID() - common.FirstEventID,
			AutoResetPoints:  executionInfo.AutoResetPoints,
			Memo:             &types.Memo{Fields: executionInfo.Memo},
			IsCron:           len(executionInfo.CronSchedule) > 0,
			UpdateTime:       common.Int64Ptr(executionInfo.LastUpdatedTimestamp.UnixNano()),
			SearchAttributes: &types.SearchAttributes{IndexedFields: executionInfo.SearchAttributes},
			PartitionConfig:  executionInfo.PartitionConfig,
		},
	}

	// TODO: we need to consider adding execution time to mutable state
	// For now execution time will be calculated based on start time and cron schedule/retry policy
	// each time DescribeWorkflowExecution is called.
	startEvent, err := mutableState.GetStartEvent(ctx)
	if err != nil {
		return nil, err
	}
	backoffDuration := time.Duration(startEvent.GetWorkflowExecutionStartedEventAttributes().GetFirstDecisionTaskBackoffSeconds()) * time.Second
	result.WorkflowExecutionInfo.ExecutionTime = common.Int64Ptr(result.WorkflowExecutionInfo.GetStartTime() + backoffDuration.Nanoseconds())

	if executionInfo.ParentRunID != "" {
		result.WorkflowExecutionInfo.ParentExecution = &types.WorkflowExecution{
			WorkflowID: executionInfo.ParentWorkflowID,
			RunID:      executionInfo.ParentRunID,
		}
		result.WorkflowExecutionInfo.ParentDomainID = common.StringPtr(executionInfo.ParentDomainID)
		result.WorkflowExecutionInfo.ParentInitiatedID = common.Int64Ptr(executionInfo.InitiatedID)
		parentDomain, err := e.shard.GetDomainCache().GetDomainName(executionInfo.ParentDomainID)
		if err != nil {
			return nil, err
		}
		result.WorkflowExecutionInfo.ParentDomain = common.StringPtr(parentDomain)
	}
	if executionInfo.State == persistence.WorkflowStateCompleted {
		// for closed workflow
		result.WorkflowExecutionInfo.CloseStatus = persistence.ToInternalWorkflowExecutionCloseStatus(executionInfo.CloseStatus)
		completionEvent, err := mutableState.GetCompletionEvent(ctx)
		if err != nil {
			return nil, err
		}
		result.WorkflowExecutionInfo.CloseTime = common.Int64Ptr(completionEvent.GetTimestamp())
	}

	if len(mutableState.GetPendingActivityInfos()) > 0 {
		for _, ai := range mutableState.GetPendingActivityInfos() {
			p := &types.PendingActivityInfo{
				ActivityID: ai.ActivityID,
			}
			state := types.PendingActivityStateScheduled
			if ai.CancelRequested {
				state = types.PendingActivityStateCancelRequested
			} else if ai.StartedID != common.EmptyEventID {
				state = types.PendingActivityStateStarted
			}
			p.State = &state
			lastHeartbeatUnixNano := ai.LastHeartBeatUpdatedTime.UnixNano()
			if lastHeartbeatUnixNano > 0 {
				p.LastHeartbeatTimestamp = common.Int64Ptr(lastHeartbeatUnixNano)
				p.HeartbeatDetails = ai.Details
			}
			// TODO: move to mutable state instead of loading it from event
			scheduledEvent, err := mutableState.GetActivityScheduledEvent(ctx, ai.ScheduleID)
			if err != nil {
				return nil, err
			}
			p.ActivityType = scheduledEvent.ActivityTaskScheduledEventAttributes.ActivityType
			if state == types.PendingActivityStateScheduled {
				p.ScheduledTimestamp = common.Int64Ptr(ai.ScheduledTime.UnixNano())
			} else {
				p.LastStartedTimestamp = common.Int64Ptr(ai.StartedTime.UnixNano())
			}
			if ai.HasRetryPolicy {
				p.Attempt = ai.Attempt
				p.ExpirationTimestamp = common.Int64Ptr(ai.ExpirationTime.UnixNano())
				if ai.MaximumAttempts != 0 {
					p.MaximumAttempts = ai.MaximumAttempts
				}
				if ai.LastFailureReason != "" {
					p.LastFailureReason = common.StringPtr(ai.LastFailureReason)
					p.LastFailureDetails = ai.LastFailureDetails
				}
				if ai.LastWorkerIdentity != "" {
					p.LastWorkerIdentity = ai.LastWorkerIdentity
				}
				if ai.StartedIdentity != "" {
					p.StartedWorkerIdentity = ai.StartedIdentity
				}
			}
			result.PendingActivities = append(result.PendingActivities, p)
		}
	}

	if len(mutableState.GetPendingChildExecutionInfos()) > 0 {
		for _, ch := range mutableState.GetPendingChildExecutionInfos() {
			childDomainName, err := execution.GetChildExecutionDomainName(
				ch,
				e.shard.GetDomainCache(),
				mutableState.GetDomainEntry(),
			)
			if err != nil {
				if !common.IsEntityNotExistsError(err) {
					return nil, err
				}
				// child domain already deleted, instead of failing the request,
				// return domainID instead since this field is only for information purpose
				childDomainName = ch.DomainID
			}
			p := &types.PendingChildExecutionInfo{
				Domain:            childDomainName,
				WorkflowID:        ch.StartedWorkflowID,
				RunID:             ch.StartedRunID,
				WorkflowTypeName:  ch.WorkflowTypeName,
				InitiatedID:       ch.InitiatedID,
				ParentClosePolicy: &ch.ParentClosePolicy,
			}
			result.PendingChildren = append(result.PendingChildren, p)
		}
	}

	if di, ok := mutableState.GetPendingDecision(); ok {
		pendingDecision := &types.PendingDecisionInfo{
			State:                      types.PendingDecisionStateScheduled.Ptr(),
			ScheduledTimestamp:         common.Int64Ptr(di.ScheduledTimestamp),
			Attempt:                    di.Attempt,
			OriginalScheduledTimestamp: common.Int64Ptr(di.OriginalScheduledTimestamp),
		}
		if di.StartedID != common.EmptyEventID {
			pendingDecision.State = types.PendingDecisionStateStarted.Ptr()
			pendingDecision.StartedTimestamp = common.Int64Ptr(di.StartedTimestamp)
		}
		result.PendingDecision = pendingDecision
	}

	return result, nil
}