func()

in service/history/decision/handler.go [285:634]


func (handler *handlerImpl) HandleDecisionTaskCompleted(
	ctx context.Context,
	req *types.HistoryRespondDecisionTaskCompletedRequest,
) (resp *types.HistoryRespondDecisionTaskCompletedResponse, retError error) {
	domainEntry, err := handler.getActiveDomainByID(req.DomainUUID)
	if err != nil {
		return nil, err
	}
	domainID := domainEntry.GetInfo().ID

	request := req.CompleteRequest
	token, err0 := handler.tokenSerializer.Deserialize(request.TaskToken)
	if err0 != nil {
		return nil, workflow.ErrDeserializingToken
	}

	workflowExecution := types.WorkflowExecution{
		WorkflowID: token.WorkflowID,
		RunID:      token.RunID,
	}

	domainName := domainEntry.GetInfo().Name
	logger := handler.logger.WithTags(
		tag.WorkflowDomainName(domainName),
		tag.WorkflowDomainID(domainEntry.GetInfo().ID),
		tag.WorkflowID(workflowExecution.GetWorkflowID()),
		tag.WorkflowRunID(workflowExecution.GetRunID()),
		tag.WorkflowScheduleID(token.ScheduleID),
	)
	scope := handler.metricsClient.Scope(metrics.HistoryRespondDecisionTaskCompletedScope,
		metrics.DomainTag(domainName),
		metrics.WorkflowTypeTag(token.WorkflowType))

	call := yarpc.CallFromContext(ctx)
	clientLibVersion := call.Header(common.LibraryVersionHeaderName)
	clientFeatureVersion := call.Header(common.FeatureVersionHeaderName)
	clientImpl := call.Header(common.ClientImplHeaderName)

	wfContext, release, err := handler.executionCache.GetOrCreateWorkflowExecution(ctx, domainID, workflowExecution)
	if err != nil {
		return nil, err
	}
	defer func() { release(retError) }()

Update_History_Loop:
	for attempt := 0; attempt < workflow.ConditionalRetryCount; attempt++ {
		logger.Debug("Update_History_Loop attempt", tag.Attempt(int32(attempt)))
		msBuilder, err := wfContext.LoadWorkflowExecution(ctx)
		if err != nil {
			return nil, err
		}
		if !msBuilder.IsWorkflowExecutionRunning() {
			return nil, workflow.ErrAlreadyCompleted
		}
		executionStats, err := wfContext.LoadExecutionStats(ctx)
		if err != nil {
			return nil, err
		}

		executionInfo := msBuilder.GetExecutionInfo()
		currentDecision, isRunning := msBuilder.GetDecisionInfo(token.ScheduleID)

		// First check to see if cache needs to be refreshed as we could potentially have stale workflow execution in
		// some extreme cassandra failure cases.
		if !isRunning && token.ScheduleID >= msBuilder.GetNextEventID() {
			scope.IncCounter(metrics.StaleMutableStateCounter)
			logger.Error("Encounter stale mutable state in RespondDecisionTaskCompleted", tag.WorkflowNextEventID(msBuilder.GetNextEventID()))
			// Reload workflow execution history
			wfContext.Clear()
			continue Update_History_Loop
		}

		if !msBuilder.IsWorkflowExecutionRunning() || !isRunning || currentDecision.Attempt != token.ScheduleAttempt || currentDecision.StartedID == common.EmptyEventID {
			logger.Debugf("Decision task not found. IsWorkflowExecutionRunning: %v, isRunning: %v, currentDecision.Attempt: %v, token.ScheduleAttempt: %v, currentDecision.StartID: %v",
				msBuilder.IsWorkflowExecutionRunning(), isRunning, getDecisionInfoAttempt(currentDecision), token.ScheduleAttempt, getDecisionInfoStartedID(currentDecision))
			return nil, &types.EntityNotExistsError{Message: "Decision task not found."}
		}

		startedID := currentDecision.StartedID
		maxResetPoints := handler.config.MaxAutoResetPoints(domainEntry.GetInfo().Name)
		if msBuilder.GetExecutionInfo().AutoResetPoints != nil && maxResetPoints == len(msBuilder.GetExecutionInfo().AutoResetPoints.Points) {
			logger.Debugf("Max reset points %d is exceeded", maxResetPoints)
			scope.IncCounter(metrics.AutoResetPointsLimitExceededCounter)
		}

		decisionHeartbeating := request.GetForceCreateNewDecisionTask() && len(request.Decisions) == 0
		var decisionHeartbeatTimeout bool
		var completedEvent *types.HistoryEvent
		if decisionHeartbeating {
			timeout := handler.config.DecisionHeartbeatTimeout(domainName)
			if currentDecision.OriginalScheduledTimestamp > 0 && handler.timeSource.Now().After(time.Unix(0, currentDecision.OriginalScheduledTimestamp).Add(timeout)) {
				decisionHeartbeatTimeout = true
				scope.IncCounter(metrics.DecisionHeartbeatTimeoutCounter)
				completedEvent, err = msBuilder.AddDecisionTaskTimedOutEvent(currentDecision.ScheduleID, currentDecision.StartedID)
				if err != nil {
					return nil, &types.InternalServiceError{Message: "Failed to add decision timeout event."}
				}
				msBuilder.ClearStickyness()
			} else {
				logger.Debug("Adding DecisionTaskCompletedEvent to mutable state for heartbeat")
				completedEvent, err = msBuilder.AddDecisionTaskCompletedEvent(token.ScheduleID, startedID, request, maxResetPoints)
				if err != nil {
					return nil, &types.InternalServiceError{Message: "Unable to add DecisionTaskCompleted event to history."}
				}
			}
		} else {
			completedEvent, err = msBuilder.AddDecisionTaskCompletedEvent(token.ScheduleID, startedID, request, maxResetPoints)
			if err != nil {
				return nil, &types.InternalServiceError{Message: "Unable to add DecisionTaskCompleted event to history."}
			}
		}

		var (
			failDecision                bool
			failCause                   types.DecisionTaskFailedCause
			failMessage                 string
			activityNotStartedCancelled bool
			continueAsNewBuilder        execution.MutableState
			hasUnhandledEvents          bool
			decisionResults             []*decisionResult
		)
		hasUnhandledEvents = msBuilder.HasBufferedEvents()

		if request.StickyAttributes == nil || request.StickyAttributes.WorkerTaskList == nil {
			scope.IncCounter(metrics.CompleteDecisionWithStickyDisabledCounter)
			executionInfo.StickyTaskList = ""
			executionInfo.StickyScheduleToStartTimeout = 0
		} else {
			scope.IncCounter(metrics.CompleteDecisionWithStickyEnabledCounter)
			executionInfo.StickyTaskList = request.StickyAttributes.WorkerTaskList.GetName()
			executionInfo.StickyScheduleToStartTimeout = request.StickyAttributes.GetScheduleToStartTimeoutSeconds()
		}
		executionInfo.ClientLibraryVersion = clientLibVersion
		executionInfo.ClientFeatureVersion = clientFeatureVersion
		executionInfo.ClientImpl = clientImpl

		binChecksum := request.GetBinaryChecksum()
		if _, ok := domainEntry.GetConfig().BadBinaries.Binaries[binChecksum]; ok {
			failDecision = true
			failCause = types.DecisionTaskFailedCauseBadBinary
			failMessage = fmt.Sprintf("binary %v is already marked as bad deployment", binChecksum)
		} else {
			workflowSizeChecker := newWorkflowSizeChecker(
				handler.config.BlobSizeLimitWarn(domainName),
				handler.config.BlobSizeLimitError(domainName),
				handler.config.HistorySizeLimitWarn(domainName),
				handler.config.HistorySizeLimitError(domainName),
				handler.config.HistoryCountLimitWarn(domainName),
				handler.config.HistoryCountLimitError(domainName),
				completedEvent.ID,
				msBuilder,
				executionStats,
				handler.metricsClient.Scope(metrics.HistoryRespondDecisionTaskCompletedScope, metrics.DomainTag(domainName)),
				handler.throttledLogger,
			)

			decisionTaskHandler := newDecisionTaskHandler(
				request.GetIdentity(),
				completedEvent.ID,
				domainEntry,
				msBuilder,
				handler.attrValidator,
				workflowSizeChecker,
				handler.tokenSerializer,
				handler.logger,
				handler.domainCache,
				handler.metricsClient,
				handler.config,
			)

			if decisionResults, err = decisionTaskHandler.handleDecisions(
				ctx,
				request.ExecutionContext,
				request.Decisions,
			); err != nil {
				return nil, err
			}

			// set the vars used by following logic
			// further refactor should also clean up the vars used below
			failDecision = decisionTaskHandler.failDecision
			if failDecision {
				failCause = *decisionTaskHandler.failDecisionCause
				failMessage = *decisionTaskHandler.failMessage
			}

			// failMessage is not used by decisionTaskHandler
			activityNotStartedCancelled = decisionTaskHandler.activityNotStartedCancelled
			// continueAsNewTimerTasks is not used by decisionTaskHandler
			continueAsNewBuilder = decisionTaskHandler.continueAsNewBuilder
			hasUnhandledEvents = decisionTaskHandler.hasUnhandledEventsBeforeDecisions
		}

		if failDecision {
			scope.IncCounter(metrics.FailedDecisionsCounter)
			logger.Info("Failing the decision.", tag.WorkflowDecisionFailCause(int64(failCause)))
			msBuilder, err = handler.failDecisionHelper(
				ctx, wfContext, token.ScheduleID, startedID, failCause, []byte(failMessage), request, domainEntry)
			if err != nil {
				return nil, err
			}
			hasUnhandledEvents = true
			continueAsNewBuilder = nil
		}

		createNewDecisionTask := msBuilder.IsWorkflowExecutionRunning() && (hasUnhandledEvents || request.GetForceCreateNewDecisionTask() || activityNotStartedCancelled)
		logger.Debugf("createNewDecisionTask: %v, msBuilder.IsWorkflowExecutionRunning: %v, hasUnhandledEvents: %v, request.GetForceCreateNewDecisionTask: %v, activityNotStartedCancelled: %v",
			createNewDecisionTask, msBuilder.IsWorkflowExecutionRunning(), hasUnhandledEvents, request.GetForceCreateNewDecisionTask(), activityNotStartedCancelled)
		var newDecisionTaskScheduledID int64
		if createNewDecisionTask {
			var newDecision *execution.DecisionInfo
			var err error
			if decisionHeartbeating && !decisionHeartbeatTimeout {
				newDecision, err = msBuilder.AddDecisionTaskScheduledEventAsHeartbeat(
					request.GetReturnNewDecisionTask(),
					currentDecision.OriginalScheduledTimestamp,
				)
			} else {
				newDecision, err = msBuilder.AddDecisionTaskScheduledEvent(
					request.GetReturnNewDecisionTask(),
				)
			}
			if err != nil {
				return nil, &types.InternalServiceError{Message: "Failed to add decision scheduled event."}
			}

			newDecisionTaskScheduledID = newDecision.ScheduleID
			// skip transfer task for decision if request asking to return new decision task
			if request.GetReturnNewDecisionTask() {
				logger.Debugf("Adding DecisionTaskStartedEvent to mutable state. new decision's ScheduleID: %d, TaskList: %s", newDecisionTaskScheduledID, newDecision.TaskList)
				// start the new decision task if request asked to do so
				// TODO: replace the poll request
				_, _, err := msBuilder.AddDecisionTaskStartedEvent(newDecision.ScheduleID, "request-from-RespondDecisionTaskCompleted", &types.PollForDecisionTaskRequest{
					TaskList: &types.TaskList{Name: newDecision.TaskList},
					Identity: request.Identity,
				})
				if err != nil {
					return nil, err
				}
			}
		}

		// We apply the update to execution using optimistic concurrency.  If it fails due to a conflict then reload
		// the history and try the operation again.
		var updateErr error
		if continueAsNewBuilder != nil {
			continueAsNewExecutionInfo := continueAsNewBuilder.GetExecutionInfo()
			logger.Debugf("Updating execution with continue as new info. new wfid: %s, runid: %s", continueAsNewExecutionInfo.WorkflowID, continueAsNewExecutionInfo.RunID)
			updateErr = wfContext.UpdateWorkflowExecutionWithNewAsActive(
				ctx,
				handler.shard.GetTimeSource().Now(),
				execution.NewContext(
					continueAsNewExecutionInfo.DomainID,
					types.WorkflowExecution{
						WorkflowID: continueAsNewExecutionInfo.WorkflowID,
						RunID:      continueAsNewExecutionInfo.RunID,
					},
					handler.shard,
					handler.shard.GetExecutionManager(),
					handler.logger,
				),
				continueAsNewBuilder,
			)
		} else {
			updateErr = wfContext.UpdateWorkflowExecutionAsActive(ctx, handler.shard.GetTimeSource().Now())
		}

		if updateErr != nil {
			if execution.IsConflictError(updateErr) {
				scope.IncCounter(metrics.ConcurrencyUpdateFailureCounter)
				continue Update_History_Loop
			}

			// if updateErr resulted in TransactionSizeLimitError then fail workflow
			switch updateErr.(type) {
			case *persistence.TransactionSizeLimitError:
				// must reload mutable state because the first call to updateWorkflowExecutionWithContext or continueAsNewWorkflowExecution
				// clears mutable state if error is returned
				msBuilder, err = wfContext.LoadWorkflowExecution(ctx)
				if err != nil {
					return nil, err
				}

				eventBatchFirstEventID := msBuilder.GetNextEventID()
				if err := execution.TerminateWorkflow(
					msBuilder,
					eventBatchFirstEventID,
					common.FailureReasonTransactionSizeExceedsLimit,
					[]byte(updateErr.Error()),
					execution.IdentityHistoryService,
				); err != nil {
					return nil, err
				}
				if err := wfContext.UpdateWorkflowExecutionAsActive(
					ctx,
					handler.shard.GetTimeSource().Now(),
				); err != nil {
					return nil, err
				}
			}

			return nil, updateErr
		}

		handler.handleBufferedQueries(
			msBuilder,
			clientImpl,
			clientFeatureVersion,
			req.GetCompleteRequest().GetQueryResults(),
			createNewDecisionTask,
			domainEntry,
			decisionHeartbeating)

		if decisionHeartbeatTimeout {
			// at this point, update is successful, but we still return an error to client so that the worker will give up this workflow
			return nil, &types.EntityNotExistsError{
				Message: "decision heartbeat timeout",
			}
		}

		resp = &types.HistoryRespondDecisionTaskCompletedResponse{}
		if !msBuilder.IsWorkflowExecutionRunning() {
			// Workflow has been completed/terminated, so there is no need to dispatch more activity/decision tasks.
			return resp, nil
		}

		activitiesToDispatchLocally := make(map[string]*types.ActivityLocalDispatchInfo)
		for _, dr := range decisionResults {
			if dr.activityDispatchInfo != nil {
				activitiesToDispatchLocally[dr.activityDispatchInfo.ActivityID] = dr.activityDispatchInfo
			}
		}
		logger.Debugf("%d activities will be dispatched locally on the client side")
		resp.ActivitiesToDispatchLocally = activitiesToDispatchLocally

		if request.GetReturnNewDecisionTask() && createNewDecisionTask {
			decision, _ := msBuilder.GetDecisionInfo(newDecisionTaskScheduledID)
			resp.StartedResponse, err = handler.createRecordDecisionTaskStartedResponse(domainID, msBuilder, decision, request.GetIdentity())
			if err != nil {
				return nil, err
			}
			// sticky is always enabled when worker request for new decision task from RespondDecisionTaskCompleted
			resp.StartedResponse.StickyExecutionEnabled = true
		}

		return resp, nil
	}

	return nil, workflow.ErrMaxAttemptsExceeded
}