in service/history/decision/handler.go [285:634]
func (handler *handlerImpl) HandleDecisionTaskCompleted(
ctx context.Context,
req *types.HistoryRespondDecisionTaskCompletedRequest,
) (resp *types.HistoryRespondDecisionTaskCompletedResponse, retError error) {
domainEntry, err := handler.getActiveDomainByID(req.DomainUUID)
if err != nil {
return nil, err
}
domainID := domainEntry.GetInfo().ID
request := req.CompleteRequest
token, err0 := handler.tokenSerializer.Deserialize(request.TaskToken)
if err0 != nil {
return nil, workflow.ErrDeserializingToken
}
workflowExecution := types.WorkflowExecution{
WorkflowID: token.WorkflowID,
RunID: token.RunID,
}
domainName := domainEntry.GetInfo().Name
logger := handler.logger.WithTags(
tag.WorkflowDomainName(domainName),
tag.WorkflowDomainID(domainEntry.GetInfo().ID),
tag.WorkflowID(workflowExecution.GetWorkflowID()),
tag.WorkflowRunID(workflowExecution.GetRunID()),
tag.WorkflowScheduleID(token.ScheduleID),
)
scope := handler.metricsClient.Scope(metrics.HistoryRespondDecisionTaskCompletedScope,
metrics.DomainTag(domainName),
metrics.WorkflowTypeTag(token.WorkflowType))
call := yarpc.CallFromContext(ctx)
clientLibVersion := call.Header(common.LibraryVersionHeaderName)
clientFeatureVersion := call.Header(common.FeatureVersionHeaderName)
clientImpl := call.Header(common.ClientImplHeaderName)
wfContext, release, err := handler.executionCache.GetOrCreateWorkflowExecution(ctx, domainID, workflowExecution)
if err != nil {
return nil, err
}
defer func() { release(retError) }()
Update_History_Loop:
for attempt := 0; attempt < workflow.ConditionalRetryCount; attempt++ {
logger.Debug("Update_History_Loop attempt", tag.Attempt(int32(attempt)))
msBuilder, err := wfContext.LoadWorkflowExecution(ctx)
if err != nil {
return nil, err
}
if !msBuilder.IsWorkflowExecutionRunning() {
return nil, workflow.ErrAlreadyCompleted
}
executionStats, err := wfContext.LoadExecutionStats(ctx)
if err != nil {
return nil, err
}
executionInfo := msBuilder.GetExecutionInfo()
currentDecision, isRunning := msBuilder.GetDecisionInfo(token.ScheduleID)
// First check to see if cache needs to be refreshed as we could potentially have stale workflow execution in
// some extreme cassandra failure cases.
if !isRunning && token.ScheduleID >= msBuilder.GetNextEventID() {
scope.IncCounter(metrics.StaleMutableStateCounter)
logger.Error("Encounter stale mutable state in RespondDecisionTaskCompleted", tag.WorkflowNextEventID(msBuilder.GetNextEventID()))
// Reload workflow execution history
wfContext.Clear()
continue Update_History_Loop
}
if !msBuilder.IsWorkflowExecutionRunning() || !isRunning || currentDecision.Attempt != token.ScheduleAttempt || currentDecision.StartedID == common.EmptyEventID {
logger.Debugf("Decision task not found. IsWorkflowExecutionRunning: %v, isRunning: %v, currentDecision.Attempt: %v, token.ScheduleAttempt: %v, currentDecision.StartID: %v",
msBuilder.IsWorkflowExecutionRunning(), isRunning, getDecisionInfoAttempt(currentDecision), token.ScheduleAttempt, getDecisionInfoStartedID(currentDecision))
return nil, &types.EntityNotExistsError{Message: "Decision task not found."}
}
startedID := currentDecision.StartedID
maxResetPoints := handler.config.MaxAutoResetPoints(domainEntry.GetInfo().Name)
if msBuilder.GetExecutionInfo().AutoResetPoints != nil && maxResetPoints == len(msBuilder.GetExecutionInfo().AutoResetPoints.Points) {
logger.Debugf("Max reset points %d is exceeded", maxResetPoints)
scope.IncCounter(metrics.AutoResetPointsLimitExceededCounter)
}
decisionHeartbeating := request.GetForceCreateNewDecisionTask() && len(request.Decisions) == 0
var decisionHeartbeatTimeout bool
var completedEvent *types.HistoryEvent
if decisionHeartbeating {
timeout := handler.config.DecisionHeartbeatTimeout(domainName)
if currentDecision.OriginalScheduledTimestamp > 0 && handler.timeSource.Now().After(time.Unix(0, currentDecision.OriginalScheduledTimestamp).Add(timeout)) {
decisionHeartbeatTimeout = true
scope.IncCounter(metrics.DecisionHeartbeatTimeoutCounter)
completedEvent, err = msBuilder.AddDecisionTaskTimedOutEvent(currentDecision.ScheduleID, currentDecision.StartedID)
if err != nil {
return nil, &types.InternalServiceError{Message: "Failed to add decision timeout event."}
}
msBuilder.ClearStickyness()
} else {
logger.Debug("Adding DecisionTaskCompletedEvent to mutable state for heartbeat")
completedEvent, err = msBuilder.AddDecisionTaskCompletedEvent(token.ScheduleID, startedID, request, maxResetPoints)
if err != nil {
return nil, &types.InternalServiceError{Message: "Unable to add DecisionTaskCompleted event to history."}
}
}
} else {
completedEvent, err = msBuilder.AddDecisionTaskCompletedEvent(token.ScheduleID, startedID, request, maxResetPoints)
if err != nil {
return nil, &types.InternalServiceError{Message: "Unable to add DecisionTaskCompleted event to history."}
}
}
var (
failDecision bool
failCause types.DecisionTaskFailedCause
failMessage string
activityNotStartedCancelled bool
continueAsNewBuilder execution.MutableState
hasUnhandledEvents bool
decisionResults []*decisionResult
)
hasUnhandledEvents = msBuilder.HasBufferedEvents()
if request.StickyAttributes == nil || request.StickyAttributes.WorkerTaskList == nil {
scope.IncCounter(metrics.CompleteDecisionWithStickyDisabledCounter)
executionInfo.StickyTaskList = ""
executionInfo.StickyScheduleToStartTimeout = 0
} else {
scope.IncCounter(metrics.CompleteDecisionWithStickyEnabledCounter)
executionInfo.StickyTaskList = request.StickyAttributes.WorkerTaskList.GetName()
executionInfo.StickyScheduleToStartTimeout = request.StickyAttributes.GetScheduleToStartTimeoutSeconds()
}
executionInfo.ClientLibraryVersion = clientLibVersion
executionInfo.ClientFeatureVersion = clientFeatureVersion
executionInfo.ClientImpl = clientImpl
binChecksum := request.GetBinaryChecksum()
if _, ok := domainEntry.GetConfig().BadBinaries.Binaries[binChecksum]; ok {
failDecision = true
failCause = types.DecisionTaskFailedCauseBadBinary
failMessage = fmt.Sprintf("binary %v is already marked as bad deployment", binChecksum)
} else {
workflowSizeChecker := newWorkflowSizeChecker(
handler.config.BlobSizeLimitWarn(domainName),
handler.config.BlobSizeLimitError(domainName),
handler.config.HistorySizeLimitWarn(domainName),
handler.config.HistorySizeLimitError(domainName),
handler.config.HistoryCountLimitWarn(domainName),
handler.config.HistoryCountLimitError(domainName),
completedEvent.ID,
msBuilder,
executionStats,
handler.metricsClient.Scope(metrics.HistoryRespondDecisionTaskCompletedScope, metrics.DomainTag(domainName)),
handler.throttledLogger,
)
decisionTaskHandler := newDecisionTaskHandler(
request.GetIdentity(),
completedEvent.ID,
domainEntry,
msBuilder,
handler.attrValidator,
workflowSizeChecker,
handler.tokenSerializer,
handler.logger,
handler.domainCache,
handler.metricsClient,
handler.config,
)
if decisionResults, err = decisionTaskHandler.handleDecisions(
ctx,
request.ExecutionContext,
request.Decisions,
); err != nil {
return nil, err
}
// set the vars used by following logic
// further refactor should also clean up the vars used below
failDecision = decisionTaskHandler.failDecision
if failDecision {
failCause = *decisionTaskHandler.failDecisionCause
failMessage = *decisionTaskHandler.failMessage
}
// failMessage is not used by decisionTaskHandler
activityNotStartedCancelled = decisionTaskHandler.activityNotStartedCancelled
// continueAsNewTimerTasks is not used by decisionTaskHandler
continueAsNewBuilder = decisionTaskHandler.continueAsNewBuilder
hasUnhandledEvents = decisionTaskHandler.hasUnhandledEventsBeforeDecisions
}
if failDecision {
scope.IncCounter(metrics.FailedDecisionsCounter)
logger.Info("Failing the decision.", tag.WorkflowDecisionFailCause(int64(failCause)))
msBuilder, err = handler.failDecisionHelper(
ctx, wfContext, token.ScheduleID, startedID, failCause, []byte(failMessage), request, domainEntry)
if err != nil {
return nil, err
}
hasUnhandledEvents = true
continueAsNewBuilder = nil
}
createNewDecisionTask := msBuilder.IsWorkflowExecutionRunning() && (hasUnhandledEvents || request.GetForceCreateNewDecisionTask() || activityNotStartedCancelled)
logger.Debugf("createNewDecisionTask: %v, msBuilder.IsWorkflowExecutionRunning: %v, hasUnhandledEvents: %v, request.GetForceCreateNewDecisionTask: %v, activityNotStartedCancelled: %v",
createNewDecisionTask, msBuilder.IsWorkflowExecutionRunning(), hasUnhandledEvents, request.GetForceCreateNewDecisionTask(), activityNotStartedCancelled)
var newDecisionTaskScheduledID int64
if createNewDecisionTask {
var newDecision *execution.DecisionInfo
var err error
if decisionHeartbeating && !decisionHeartbeatTimeout {
newDecision, err = msBuilder.AddDecisionTaskScheduledEventAsHeartbeat(
request.GetReturnNewDecisionTask(),
currentDecision.OriginalScheduledTimestamp,
)
} else {
newDecision, err = msBuilder.AddDecisionTaskScheduledEvent(
request.GetReturnNewDecisionTask(),
)
}
if err != nil {
return nil, &types.InternalServiceError{Message: "Failed to add decision scheduled event."}
}
newDecisionTaskScheduledID = newDecision.ScheduleID
// skip transfer task for decision if request asking to return new decision task
if request.GetReturnNewDecisionTask() {
logger.Debugf("Adding DecisionTaskStartedEvent to mutable state. new decision's ScheduleID: %d, TaskList: %s", newDecisionTaskScheduledID, newDecision.TaskList)
// start the new decision task if request asked to do so
// TODO: replace the poll request
_, _, err := msBuilder.AddDecisionTaskStartedEvent(newDecision.ScheduleID, "request-from-RespondDecisionTaskCompleted", &types.PollForDecisionTaskRequest{
TaskList: &types.TaskList{Name: newDecision.TaskList},
Identity: request.Identity,
})
if err != nil {
return nil, err
}
}
}
// We apply the update to execution using optimistic concurrency. If it fails due to a conflict then reload
// the history and try the operation again.
var updateErr error
if continueAsNewBuilder != nil {
continueAsNewExecutionInfo := continueAsNewBuilder.GetExecutionInfo()
logger.Debugf("Updating execution with continue as new info. new wfid: %s, runid: %s", continueAsNewExecutionInfo.WorkflowID, continueAsNewExecutionInfo.RunID)
updateErr = wfContext.UpdateWorkflowExecutionWithNewAsActive(
ctx,
handler.shard.GetTimeSource().Now(),
execution.NewContext(
continueAsNewExecutionInfo.DomainID,
types.WorkflowExecution{
WorkflowID: continueAsNewExecutionInfo.WorkflowID,
RunID: continueAsNewExecutionInfo.RunID,
},
handler.shard,
handler.shard.GetExecutionManager(),
handler.logger,
),
continueAsNewBuilder,
)
} else {
updateErr = wfContext.UpdateWorkflowExecutionAsActive(ctx, handler.shard.GetTimeSource().Now())
}
if updateErr != nil {
if execution.IsConflictError(updateErr) {
scope.IncCounter(metrics.ConcurrencyUpdateFailureCounter)
continue Update_History_Loop
}
// if updateErr resulted in TransactionSizeLimitError then fail workflow
switch updateErr.(type) {
case *persistence.TransactionSizeLimitError:
// must reload mutable state because the first call to updateWorkflowExecutionWithContext or continueAsNewWorkflowExecution
// clears mutable state if error is returned
msBuilder, err = wfContext.LoadWorkflowExecution(ctx)
if err != nil {
return nil, err
}
eventBatchFirstEventID := msBuilder.GetNextEventID()
if err := execution.TerminateWorkflow(
msBuilder,
eventBatchFirstEventID,
common.FailureReasonTransactionSizeExceedsLimit,
[]byte(updateErr.Error()),
execution.IdentityHistoryService,
); err != nil {
return nil, err
}
if err := wfContext.UpdateWorkflowExecutionAsActive(
ctx,
handler.shard.GetTimeSource().Now(),
); err != nil {
return nil, err
}
}
return nil, updateErr
}
handler.handleBufferedQueries(
msBuilder,
clientImpl,
clientFeatureVersion,
req.GetCompleteRequest().GetQueryResults(),
createNewDecisionTask,
domainEntry,
decisionHeartbeating)
if decisionHeartbeatTimeout {
// at this point, update is successful, but we still return an error to client so that the worker will give up this workflow
return nil, &types.EntityNotExistsError{
Message: "decision heartbeat timeout",
}
}
resp = &types.HistoryRespondDecisionTaskCompletedResponse{}
if !msBuilder.IsWorkflowExecutionRunning() {
// Workflow has been completed/terminated, so there is no need to dispatch more activity/decision tasks.
return resp, nil
}
activitiesToDispatchLocally := make(map[string]*types.ActivityLocalDispatchInfo)
for _, dr := range decisionResults {
if dr.activityDispatchInfo != nil {
activitiesToDispatchLocally[dr.activityDispatchInfo.ActivityID] = dr.activityDispatchInfo
}
}
logger.Debugf("%d activities will be dispatched locally on the client side")
resp.ActivitiesToDispatchLocally = activitiesToDispatchLocally
if request.GetReturnNewDecisionTask() && createNewDecisionTask {
decision, _ := msBuilder.GetDecisionInfo(newDecisionTaskScheduledID)
resp.StartedResponse, err = handler.createRecordDecisionTaskStartedResponse(domainID, msBuilder, decision, request.GetIdentity())
if err != nil {
return nil, err
}
// sticky is always enabled when worker request for new decision task from RespondDecisionTaskCompleted
resp.StartedResponse.StickyExecutionEnabled = true
}
return resp, nil
}
return nil, workflow.ErrMaxAttemptsExceeded
}