service/history/decision/handler.go (743 lines of code) (raw):
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package decision
import (
"context"
"fmt"
"time"
"go.uber.org/yarpc"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/client"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/service/history/config"
"github.com/uber/cadence/service/history/execution"
"github.com/uber/cadence/service/history/query"
"github.com/uber/cadence/service/history/shard"
"github.com/uber/cadence/service/history/workflow"
)
type (
// Handler contains decision business logic
Handler interface {
HandleDecisionTaskScheduled(context.Context, *types.ScheduleDecisionTaskRequest) error
HandleDecisionTaskStarted(context.Context,
*types.RecordDecisionTaskStartedRequest) (*types.RecordDecisionTaskStartedResponse, error)
HandleDecisionTaskFailed(context.Context,
*types.HistoryRespondDecisionTaskFailedRequest) error
HandleDecisionTaskCompleted(context.Context,
*types.HistoryRespondDecisionTaskCompletedRequest) (*types.HistoryRespondDecisionTaskCompletedResponse, error)
// TODO also include the handle of decision timeout here
}
handlerImpl struct {
config *config.Config
shard shard.Context
timeSource clock.TimeSource
domainCache cache.DomainCache
executionCache *execution.Cache
tokenSerializer common.TaskTokenSerializer
metricsClient metrics.Client
logger log.Logger
throttledLogger log.Logger
attrValidator *attrValidator
versionChecker client.VersionChecker
}
)
// NewHandler creates a new Handler for handling decision business logic
func NewHandler(
shard shard.Context,
executionCache *execution.Cache,
tokenSerializer common.TaskTokenSerializer,
) Handler {
config := shard.GetConfig()
logger := shard.GetLogger().WithTags(tag.ComponentDecisionHandler)
return &handlerImpl{
config: config,
shard: shard,
timeSource: shard.GetTimeSource(),
domainCache: shard.GetDomainCache(),
executionCache: executionCache,
tokenSerializer: tokenSerializer,
metricsClient: shard.GetMetricsClient(),
logger: shard.GetLogger().WithTags(tag.ComponentDecisionHandler),
throttledLogger: shard.GetThrottledLogger().WithTags(tag.ComponentDecisionHandler),
attrValidator: newAttrValidator(
shard.GetDomainCache(),
shard.GetMetricsClient(),
config,
logger,
),
versionChecker: client.NewVersionChecker(),
}
}
func (handler *handlerImpl) HandleDecisionTaskScheduled(
ctx context.Context,
req *types.ScheduleDecisionTaskRequest,
) error {
domainEntry, err := handler.getActiveDomainByID(req.DomainUUID)
if err != nil {
return err
}
domainID := domainEntry.GetInfo().ID
workflowExecution := types.WorkflowExecution{
WorkflowID: req.WorkflowExecution.WorkflowID,
RunID: req.WorkflowExecution.RunID,
}
return workflow.UpdateWithActionFunc(
ctx,
handler.executionCache,
domainID,
workflowExecution,
handler.timeSource.Now(),
func(context execution.Context, mutableState execution.MutableState) (*workflow.UpdateAction, error) {
if !mutableState.IsWorkflowExecutionRunning() {
return nil, workflow.ErrNotExists
}
if mutableState.HasProcessedOrPendingDecision() {
return &workflow.UpdateAction{
Noop: true,
}, nil
}
startEvent, err := mutableState.GetStartEvent(ctx)
if err != nil {
return nil, err
}
if err := mutableState.AddFirstDecisionTaskScheduled(
startEvent,
); err != nil {
return nil, err
}
return &workflow.UpdateAction{}, nil
},
)
}
func (handler *handlerImpl) HandleDecisionTaskStarted(
ctx context.Context,
req *types.RecordDecisionTaskStartedRequest,
) (*types.RecordDecisionTaskStartedResponse, error) {
domainEntry, err := handler.getActiveDomainByID(req.DomainUUID)
if err != nil {
return nil, err
}
domainID := domainEntry.GetInfo().ID
workflowExecution := types.WorkflowExecution{
WorkflowID: req.WorkflowExecution.WorkflowID,
RunID: req.WorkflowExecution.RunID,
}
scheduleID := req.GetScheduleID()
requestID := req.GetRequestID()
var resp *types.RecordDecisionTaskStartedResponse
err = workflow.UpdateWithActionFunc(
ctx,
handler.executionCache,
domainID,
workflowExecution,
handler.timeSource.Now(),
func(context execution.Context, mutableState execution.MutableState) (*workflow.UpdateAction, error) {
if !mutableState.IsWorkflowExecutionRunning() {
return nil, workflow.ErrNotExists
}
decision, isRunning := mutableState.GetDecisionInfo(scheduleID)
// First check to see if cache needs to be refreshed as we could potentially have stale workflow execution in
// some extreme cassandra failure cases.
if !isRunning && scheduleID >= mutableState.GetNextEventID() {
handler.metricsClient.IncCounter(metrics.HistoryRecordDecisionTaskStartedScope, metrics.StaleMutableStateCounter)
handler.logger.Error("Encounter stale mutable state in RecordDecisionTaskStarted",
tag.WorkflowDomainName(domainEntry.GetInfo().Name),
tag.WorkflowID(workflowExecution.GetWorkflowID()),
tag.WorkflowRunID(workflowExecution.GetRunID()),
tag.WorkflowScheduleID(scheduleID),
tag.WorkflowNextEventID(mutableState.GetNextEventID()),
)
// Reload workflow execution history
// ErrStaleState will trigger updateWorkflowExecutionWithAction function to reload the mutable state
return nil, workflow.ErrStaleState
}
// Check execution state to make sure task is in the list of outstanding tasks and it is not yet started. If
// task is not outstanding than it is most probably a duplicate and complete the task.
if !isRunning {
// Looks like DecisionTask already completed as a result of another call.
// It is OK to drop the task at this point.
return nil, &types.EntityNotExistsError{Message: "Decision task not found."}
}
updateAction := &workflow.UpdateAction{}
if decision.StartedID != common.EmptyEventID {
// If decision is started as part of the current request scope then return a positive response
if decision.RequestID == requestID {
resp, err = handler.createRecordDecisionTaskStartedResponse(domainID, mutableState, decision, req.PollRequest.GetIdentity())
if err != nil {
return nil, err
}
updateAction.Noop = true
return updateAction, nil
}
// Looks like DecisionTask already started as a result of another call.
// It is OK to drop the task at this point.
return nil, &types.EventAlreadyStartedError{Message: "Decision task already started."}
}
_, decision, err = mutableState.AddDecisionTaskStartedEvent(scheduleID, requestID, req.PollRequest)
if err != nil {
// Unable to add DecisionTaskStarted event to history
return nil, &types.InternalServiceError{Message: "Unable to add DecisionTaskStarted event to history."}
}
resp, err = handler.createRecordDecisionTaskStartedResponse(domainID, mutableState, decision, req.PollRequest.GetIdentity())
if err != nil {
return nil, err
}
return updateAction, nil
},
)
if err != nil {
return nil, err
}
return resp, nil
}
func (handler *handlerImpl) HandleDecisionTaskFailed(
ctx context.Context,
req *types.HistoryRespondDecisionTaskFailedRequest,
) (retError error) {
domainEntry, err := handler.getActiveDomainByID(req.DomainUUID)
if err != nil {
return err
}
domainID := domainEntry.GetInfo().ID
request := req.FailedRequest
token, err := handler.tokenSerializer.Deserialize(request.TaskToken)
if err != nil {
return workflow.ErrDeserializingToken
}
workflowExecution := types.WorkflowExecution{
WorkflowID: token.WorkflowID,
RunID: token.RunID,
}
return workflow.UpdateWithAction(ctx, handler.executionCache, domainID, workflowExecution, true, handler.timeSource.Now(),
func(context execution.Context, mutableState execution.MutableState) error {
if !mutableState.IsWorkflowExecutionRunning() {
return workflow.ErrAlreadyCompleted
}
scheduleID := token.ScheduleID
decision, isRunning := mutableState.GetDecisionInfo(scheduleID)
if !isRunning || decision.Attempt != token.ScheduleAttempt || decision.StartedID == common.EmptyEventID {
return &types.EntityNotExistsError{Message: "Decision task not found."}
}
_, err := mutableState.AddDecisionTaskFailedEvent(decision.ScheduleID, decision.StartedID, request.GetCause(), request.Details,
request.GetIdentity(), "", request.GetBinaryChecksum(), "", "", 0, "")
return err
})
}
func (handler *handlerImpl) HandleDecisionTaskCompleted(
ctx context.Context,
req *types.HistoryRespondDecisionTaskCompletedRequest,
) (resp *types.HistoryRespondDecisionTaskCompletedResponse, retError error) {
domainEntry, err := handler.getActiveDomainByID(req.DomainUUID)
if err != nil {
return nil, err
}
domainID := domainEntry.GetInfo().ID
request := req.CompleteRequest
token, err0 := handler.tokenSerializer.Deserialize(request.TaskToken)
if err0 != nil {
return nil, workflow.ErrDeserializingToken
}
workflowExecution := types.WorkflowExecution{
WorkflowID: token.WorkflowID,
RunID: token.RunID,
}
domainName := domainEntry.GetInfo().Name
logger := handler.logger.WithTags(
tag.WorkflowDomainName(domainName),
tag.WorkflowDomainID(domainEntry.GetInfo().ID),
tag.WorkflowID(workflowExecution.GetWorkflowID()),
tag.WorkflowRunID(workflowExecution.GetRunID()),
tag.WorkflowScheduleID(token.ScheduleID),
)
scope := handler.metricsClient.Scope(metrics.HistoryRespondDecisionTaskCompletedScope,
metrics.DomainTag(domainName),
metrics.WorkflowTypeTag(token.WorkflowType))
call := yarpc.CallFromContext(ctx)
clientLibVersion := call.Header(common.LibraryVersionHeaderName)
clientFeatureVersion := call.Header(common.FeatureVersionHeaderName)
clientImpl := call.Header(common.ClientImplHeaderName)
wfContext, release, err := handler.executionCache.GetOrCreateWorkflowExecution(ctx, domainID, workflowExecution)
if err != nil {
return nil, err
}
defer func() { release(retError) }()
Update_History_Loop:
for attempt := 0; attempt < workflow.ConditionalRetryCount; attempt++ {
logger.Debug("Update_History_Loop attempt", tag.Attempt(int32(attempt)))
msBuilder, err := wfContext.LoadWorkflowExecution(ctx)
if err != nil {
return nil, err
}
if !msBuilder.IsWorkflowExecutionRunning() {
return nil, workflow.ErrAlreadyCompleted
}
executionStats, err := wfContext.LoadExecutionStats(ctx)
if err != nil {
return nil, err
}
executionInfo := msBuilder.GetExecutionInfo()
currentDecision, isRunning := msBuilder.GetDecisionInfo(token.ScheduleID)
// First check to see if cache needs to be refreshed as we could potentially have stale workflow execution in
// some extreme cassandra failure cases.
if !isRunning && token.ScheduleID >= msBuilder.GetNextEventID() {
scope.IncCounter(metrics.StaleMutableStateCounter)
logger.Error("Encounter stale mutable state in RespondDecisionTaskCompleted", tag.WorkflowNextEventID(msBuilder.GetNextEventID()))
// Reload workflow execution history
wfContext.Clear()
continue Update_History_Loop
}
if !msBuilder.IsWorkflowExecutionRunning() || !isRunning || currentDecision.Attempt != token.ScheduleAttempt || currentDecision.StartedID == common.EmptyEventID {
logger.Debugf("Decision task not found. IsWorkflowExecutionRunning: %v, isRunning: %v, currentDecision.Attempt: %v, token.ScheduleAttempt: %v, currentDecision.StartID: %v",
msBuilder.IsWorkflowExecutionRunning(), isRunning, getDecisionInfoAttempt(currentDecision), token.ScheduleAttempt, getDecisionInfoStartedID(currentDecision))
return nil, &types.EntityNotExistsError{Message: "Decision task not found."}
}
startedID := currentDecision.StartedID
maxResetPoints := handler.config.MaxAutoResetPoints(domainEntry.GetInfo().Name)
if msBuilder.GetExecutionInfo().AutoResetPoints != nil && maxResetPoints == len(msBuilder.GetExecutionInfo().AutoResetPoints.Points) {
logger.Debugf("Max reset points %d is exceeded", maxResetPoints)
scope.IncCounter(metrics.AutoResetPointsLimitExceededCounter)
}
decisionHeartbeating := request.GetForceCreateNewDecisionTask() && len(request.Decisions) == 0
var decisionHeartbeatTimeout bool
var completedEvent *types.HistoryEvent
if decisionHeartbeating {
timeout := handler.config.DecisionHeartbeatTimeout(domainName)
if currentDecision.OriginalScheduledTimestamp > 0 && handler.timeSource.Now().After(time.Unix(0, currentDecision.OriginalScheduledTimestamp).Add(timeout)) {
decisionHeartbeatTimeout = true
scope.IncCounter(metrics.DecisionHeartbeatTimeoutCounter)
completedEvent, err = msBuilder.AddDecisionTaskTimedOutEvent(currentDecision.ScheduleID, currentDecision.StartedID)
if err != nil {
return nil, &types.InternalServiceError{Message: "Failed to add decision timeout event."}
}
msBuilder.ClearStickyness()
} else {
logger.Debug("Adding DecisionTaskCompletedEvent to mutable state for heartbeat")
completedEvent, err = msBuilder.AddDecisionTaskCompletedEvent(token.ScheduleID, startedID, request, maxResetPoints)
if err != nil {
return nil, &types.InternalServiceError{Message: "Unable to add DecisionTaskCompleted event to history."}
}
}
} else {
completedEvent, err = msBuilder.AddDecisionTaskCompletedEvent(token.ScheduleID, startedID, request, maxResetPoints)
if err != nil {
return nil, &types.InternalServiceError{Message: "Unable to add DecisionTaskCompleted event to history."}
}
}
var (
failDecision bool
failCause types.DecisionTaskFailedCause
failMessage string
activityNotStartedCancelled bool
continueAsNewBuilder execution.MutableState
hasUnhandledEvents bool
decisionResults []*decisionResult
)
hasUnhandledEvents = msBuilder.HasBufferedEvents()
if request.StickyAttributes == nil || request.StickyAttributes.WorkerTaskList == nil {
scope.IncCounter(metrics.CompleteDecisionWithStickyDisabledCounter)
executionInfo.StickyTaskList = ""
executionInfo.StickyScheduleToStartTimeout = 0
} else {
scope.IncCounter(metrics.CompleteDecisionWithStickyEnabledCounter)
executionInfo.StickyTaskList = request.StickyAttributes.WorkerTaskList.GetName()
executionInfo.StickyScheduleToStartTimeout = request.StickyAttributes.GetScheduleToStartTimeoutSeconds()
}
executionInfo.ClientLibraryVersion = clientLibVersion
executionInfo.ClientFeatureVersion = clientFeatureVersion
executionInfo.ClientImpl = clientImpl
binChecksum := request.GetBinaryChecksum()
if _, ok := domainEntry.GetConfig().BadBinaries.Binaries[binChecksum]; ok {
failDecision = true
failCause = types.DecisionTaskFailedCauseBadBinary
failMessage = fmt.Sprintf("binary %v is already marked as bad deployment", binChecksum)
} else {
workflowSizeChecker := newWorkflowSizeChecker(
handler.config.BlobSizeLimitWarn(domainName),
handler.config.BlobSizeLimitError(domainName),
handler.config.HistorySizeLimitWarn(domainName),
handler.config.HistorySizeLimitError(domainName),
handler.config.HistoryCountLimitWarn(domainName),
handler.config.HistoryCountLimitError(domainName),
completedEvent.ID,
msBuilder,
executionStats,
handler.metricsClient.Scope(metrics.HistoryRespondDecisionTaskCompletedScope, metrics.DomainTag(domainName)),
handler.throttledLogger,
)
decisionTaskHandler := newDecisionTaskHandler(
request.GetIdentity(),
completedEvent.ID,
domainEntry,
msBuilder,
handler.attrValidator,
workflowSizeChecker,
handler.tokenSerializer,
handler.logger,
handler.domainCache,
handler.metricsClient,
handler.config,
)
if decisionResults, err = decisionTaskHandler.handleDecisions(
ctx,
request.ExecutionContext,
request.Decisions,
); err != nil {
return nil, err
}
// set the vars used by following logic
// further refactor should also clean up the vars used below
failDecision = decisionTaskHandler.failDecision
if failDecision {
failCause = *decisionTaskHandler.failDecisionCause
failMessage = *decisionTaskHandler.failMessage
}
// failMessage is not used by decisionTaskHandler
activityNotStartedCancelled = decisionTaskHandler.activityNotStartedCancelled
// continueAsNewTimerTasks is not used by decisionTaskHandler
continueAsNewBuilder = decisionTaskHandler.continueAsNewBuilder
hasUnhandledEvents = decisionTaskHandler.hasUnhandledEventsBeforeDecisions
}
if failDecision {
scope.IncCounter(metrics.FailedDecisionsCounter)
logger.Info("Failing the decision.", tag.WorkflowDecisionFailCause(int64(failCause)))
msBuilder, err = handler.failDecisionHelper(
ctx, wfContext, token.ScheduleID, startedID, failCause, []byte(failMessage), request, domainEntry)
if err != nil {
return nil, err
}
hasUnhandledEvents = true
continueAsNewBuilder = nil
}
createNewDecisionTask := msBuilder.IsWorkflowExecutionRunning() && (hasUnhandledEvents || request.GetForceCreateNewDecisionTask() || activityNotStartedCancelled)
logger.Debugf("createNewDecisionTask: %v, msBuilder.IsWorkflowExecutionRunning: %v, hasUnhandledEvents: %v, request.GetForceCreateNewDecisionTask: %v, activityNotStartedCancelled: %v",
createNewDecisionTask, msBuilder.IsWorkflowExecutionRunning(), hasUnhandledEvents, request.GetForceCreateNewDecisionTask(), activityNotStartedCancelled)
var newDecisionTaskScheduledID int64
if createNewDecisionTask {
var newDecision *execution.DecisionInfo
var err error
if decisionHeartbeating && !decisionHeartbeatTimeout {
newDecision, err = msBuilder.AddDecisionTaskScheduledEventAsHeartbeat(
request.GetReturnNewDecisionTask(),
currentDecision.OriginalScheduledTimestamp,
)
} else {
newDecision, err = msBuilder.AddDecisionTaskScheduledEvent(
request.GetReturnNewDecisionTask(),
)
}
if err != nil {
return nil, &types.InternalServiceError{Message: "Failed to add decision scheduled event."}
}
newDecisionTaskScheduledID = newDecision.ScheduleID
// skip transfer task for decision if request asking to return new decision task
if request.GetReturnNewDecisionTask() {
logger.Debugf("Adding DecisionTaskStartedEvent to mutable state. new decision's ScheduleID: %d, TaskList: %s", newDecisionTaskScheduledID, newDecision.TaskList)
// start the new decision task if request asked to do so
// TODO: replace the poll request
_, _, err := msBuilder.AddDecisionTaskStartedEvent(newDecision.ScheduleID, "request-from-RespondDecisionTaskCompleted", &types.PollForDecisionTaskRequest{
TaskList: &types.TaskList{Name: newDecision.TaskList},
Identity: request.Identity,
})
if err != nil {
return nil, err
}
}
}
// We apply the update to execution using optimistic concurrency. If it fails due to a conflict then reload
// the history and try the operation again.
var updateErr error
if continueAsNewBuilder != nil {
continueAsNewExecutionInfo := continueAsNewBuilder.GetExecutionInfo()
logger.Debugf("Updating execution with continue as new info. new wfid: %s, runid: %s", continueAsNewExecutionInfo.WorkflowID, continueAsNewExecutionInfo.RunID)
updateErr = wfContext.UpdateWorkflowExecutionWithNewAsActive(
ctx,
handler.shard.GetTimeSource().Now(),
execution.NewContext(
continueAsNewExecutionInfo.DomainID,
types.WorkflowExecution{
WorkflowID: continueAsNewExecutionInfo.WorkflowID,
RunID: continueAsNewExecutionInfo.RunID,
},
handler.shard,
handler.shard.GetExecutionManager(),
handler.logger,
),
continueAsNewBuilder,
)
} else {
updateErr = wfContext.UpdateWorkflowExecutionAsActive(ctx, handler.shard.GetTimeSource().Now())
}
if updateErr != nil {
if execution.IsConflictError(updateErr) {
scope.IncCounter(metrics.ConcurrencyUpdateFailureCounter)
continue Update_History_Loop
}
// if updateErr resulted in TransactionSizeLimitError then fail workflow
switch updateErr.(type) {
case *persistence.TransactionSizeLimitError:
// must reload mutable state because the first call to updateWorkflowExecutionWithContext or continueAsNewWorkflowExecution
// clears mutable state if error is returned
msBuilder, err = wfContext.LoadWorkflowExecution(ctx)
if err != nil {
return nil, err
}
eventBatchFirstEventID := msBuilder.GetNextEventID()
if err := execution.TerminateWorkflow(
msBuilder,
eventBatchFirstEventID,
common.FailureReasonTransactionSizeExceedsLimit,
[]byte(updateErr.Error()),
execution.IdentityHistoryService,
); err != nil {
return nil, err
}
if err := wfContext.UpdateWorkflowExecutionAsActive(
ctx,
handler.shard.GetTimeSource().Now(),
); err != nil {
return nil, err
}
}
return nil, updateErr
}
handler.handleBufferedQueries(
msBuilder,
clientImpl,
clientFeatureVersion,
req.GetCompleteRequest().GetQueryResults(),
createNewDecisionTask,
domainEntry,
decisionHeartbeating)
if decisionHeartbeatTimeout {
// at this point, update is successful, but we still return an error to client so that the worker will give up this workflow
return nil, &types.EntityNotExistsError{
Message: "decision heartbeat timeout",
}
}
resp = &types.HistoryRespondDecisionTaskCompletedResponse{}
if !msBuilder.IsWorkflowExecutionRunning() {
// Workflow has been completed/terminated, so there is no need to dispatch more activity/decision tasks.
return resp, nil
}
activitiesToDispatchLocally := make(map[string]*types.ActivityLocalDispatchInfo)
for _, dr := range decisionResults {
if dr.activityDispatchInfo != nil {
activitiesToDispatchLocally[dr.activityDispatchInfo.ActivityID] = dr.activityDispatchInfo
}
}
logger.Debugf("%d activities will be dispatched locally on the client side")
resp.ActivitiesToDispatchLocally = activitiesToDispatchLocally
if request.GetReturnNewDecisionTask() && createNewDecisionTask {
decision, _ := msBuilder.GetDecisionInfo(newDecisionTaskScheduledID)
resp.StartedResponse, err = handler.createRecordDecisionTaskStartedResponse(domainID, msBuilder, decision, request.GetIdentity())
if err != nil {
return nil, err
}
// sticky is always enabled when worker request for new decision task from RespondDecisionTaskCompleted
resp.StartedResponse.StickyExecutionEnabled = true
}
return resp, nil
}
return nil, workflow.ErrMaxAttemptsExceeded
}
func (handler *handlerImpl) createRecordDecisionTaskStartedResponse(
domainID string,
msBuilder execution.MutableState,
decision *execution.DecisionInfo,
identity string,
) (*types.RecordDecisionTaskStartedResponse, error) {
response := &types.RecordDecisionTaskStartedResponse{}
response.WorkflowType = msBuilder.GetWorkflowType()
executionInfo := msBuilder.GetExecutionInfo()
if executionInfo.LastProcessedEvent != common.EmptyEventID {
response.PreviousStartedEventID = common.Int64Ptr(executionInfo.LastProcessedEvent)
}
// Starting decision could result in different scheduleID if decision was transient and new new events came in
// before it was started.
response.ScheduledEventID = decision.ScheduleID
response.StartedEventID = decision.StartedID
// if we call IsStickyTaskListEnabled then it's possible that the decision is a
// sticky decision but due to TTL check, the field becomes false
// NOTE: it's possible that StickyTaskList is empty is even if the decision is scheduled
// on a sticky tasklist since stickiness can be cleared at anytime by the ResetStickyTaskList API.
// When this field is false, we will send full workflow history to client
// (see createPollForDecisionTaskResponse in workflowHandler.go)
// This is actually desired since if that API is called, it basically means the client side
// cache has been cleared for the workflow and full history is needed by the client. Even if
// client side still has the cache, client library is still able to handle the situation.
response.StickyExecutionEnabled = msBuilder.GetExecutionInfo().StickyTaskList != ""
response.NextEventID = msBuilder.GetNextEventID()
response.Attempt = decision.Attempt
response.WorkflowExecutionTaskList = &types.TaskList{
Name: executionInfo.TaskList,
Kind: types.TaskListKindNormal.Ptr(),
}
response.ScheduledTimestamp = common.Int64Ptr(decision.ScheduledTimestamp)
response.StartedTimestamp = common.Int64Ptr(decision.StartedTimestamp)
if decision.Attempt > 0 {
// This decision is retried from mutable state
// Also return schedule and started which are not written to history yet
scheduledEvent, startedEvent := msBuilder.CreateTransientDecisionEvents(decision, identity)
response.DecisionInfo = &types.TransientDecisionInfo{}
response.DecisionInfo.ScheduledEvent = scheduledEvent
response.DecisionInfo.StartedEvent = startedEvent
}
currentBranchToken, err := msBuilder.GetCurrentBranchToken()
if err != nil {
return nil, err
}
response.BranchToken = currentBranchToken
qr := msBuilder.GetQueryRegistry()
buffered := qr.GetBufferedIDs()
queries := make(map[string]*types.WorkflowQuery)
for _, id := range buffered {
input, err := qr.GetQueryInput(id)
if err != nil {
continue
}
queries[id] = input
}
response.Queries = queries
response.HistorySize = msBuilder.GetHistorySize()
return response, nil
}
func (handler *handlerImpl) handleBufferedQueries(
msBuilder execution.MutableState,
clientImpl string,
clientFeatureVersion string,
queryResults map[string]*types.WorkflowQueryResult,
createNewDecisionTask bool,
domainEntry *cache.DomainCacheEntry,
decisionHeartbeating bool,
) {
queryRegistry := msBuilder.GetQueryRegistry()
if !queryRegistry.HasBufferedQuery() {
return
}
domainID := domainEntry.GetInfo().ID
domain := domainEntry.GetInfo().Name
workflowID := msBuilder.GetExecutionInfo().WorkflowID
runID := msBuilder.GetExecutionInfo().RunID
scope := handler.metricsClient.Scope(
metrics.HistoryRespondDecisionTaskCompletedScope,
metrics.DomainTag(domainEntry.GetInfo().Name),
metrics.DecisionTypeTag("ConsistentQuery"))
// Consistent query requires both server and client worker support. If a consistent query was requested (meaning there are
// buffered queries) but worker does not support consistent query then all buffered queries should be failed.
if versionErr := handler.versionChecker.SupportsConsistentQuery(clientImpl, clientFeatureVersion); versionErr != nil {
scope.IncCounter(metrics.WorkerNotSupportsConsistentQueryCount)
failedTerminationState := &query.TerminationState{
TerminationType: query.TerminationTypeFailed,
Failure: &types.BadRequestError{Message: versionErr.Error()},
}
buffered := queryRegistry.GetBufferedIDs()
handler.logger.Info(
"failing query because worker does not support consistent query",
tag.ClientImpl(clientImpl),
tag.ClientFeatureVersion(clientFeatureVersion),
tag.WorkflowDomainName(domain),
tag.WorkflowID(workflowID),
tag.WorkflowRunID(runID),
tag.Error(versionErr))
for _, id := range buffered {
if err := queryRegistry.SetTerminationState(id, failedTerminationState); err != nil {
handler.logger.Error(
"failed to set query termination state to failed",
tag.WorkflowDomainName(domain),
tag.WorkflowID(workflowID),
tag.WorkflowRunID(runID),
tag.QueryID(id),
tag.Error(err))
scope.IncCounter(metrics.QueryRegistryInvalidStateCount)
}
}
return
}
// if its a heartbeat decision it means local activities may still be running on the worker
// which were started by an external event which happened before the query
if decisionHeartbeating {
return
}
sizeLimitError := handler.config.BlobSizeLimitError(domain)
sizeLimitWarn := handler.config.BlobSizeLimitWarn(domain)
// Complete or fail all queries we have results for
for id, result := range queryResults {
if err := common.CheckEventBlobSizeLimit(
len(result.GetAnswer()),
sizeLimitWarn,
sizeLimitError,
domainID,
workflowID,
runID,
scope,
handler.throttledLogger,
tag.BlobSizeViolationOperation("ConsistentQuery"),
); err != nil {
handler.logger.Info("failing query because query result size is too large",
tag.WorkflowDomainName(domain),
tag.WorkflowID(workflowID),
tag.WorkflowRunID(runID),
tag.QueryID(id),
tag.Error(err))
failedTerminationState := &query.TerminationState{
TerminationType: query.TerminationTypeFailed,
Failure: err,
}
if err := queryRegistry.SetTerminationState(id, failedTerminationState); err != nil {
handler.logger.Error(
"failed to set query termination state to failed",
tag.WorkflowDomainName(domain),
tag.WorkflowID(workflowID),
tag.WorkflowRunID(runID),
tag.QueryID(id),
tag.Error(err))
scope.IncCounter(metrics.QueryRegistryInvalidStateCount)
}
} else {
completedTerminationState := &query.TerminationState{
TerminationType: query.TerminationTypeCompleted,
QueryResult: result,
}
if err := queryRegistry.SetTerminationState(id, completedTerminationState); err != nil {
handler.logger.Error(
"failed to set query termination state to completed",
tag.WorkflowDomainName(domain),
tag.WorkflowID(workflowID),
tag.WorkflowRunID(runID),
tag.QueryID(id),
tag.Error(err))
scope.IncCounter(metrics.QueryRegistryInvalidStateCount)
}
}
}
// If no decision task was created then it means no buffered events came in during this decision task's handling.
// This means all unanswered buffered queries can be dispatched directly through matching at this point.
if !createNewDecisionTask {
buffered := queryRegistry.GetBufferedIDs()
for _, id := range buffered {
unblockTerminationState := &query.TerminationState{
TerminationType: query.TerminationTypeUnblocked,
}
if err := queryRegistry.SetTerminationState(id, unblockTerminationState); err != nil {
handler.logger.Error(
"failed to set query termination state to unblocked",
tag.WorkflowDomainName(domain),
tag.WorkflowID(workflowID),
tag.WorkflowRunID(runID),
tag.QueryID(id),
tag.Error(err))
scope.IncCounter(metrics.QueryRegistryInvalidStateCount)
}
}
}
}
func (handler *handlerImpl) failDecisionHelper(
ctx context.Context,
wfContext execution.Context,
scheduleID int64,
startedID int64,
cause types.DecisionTaskFailedCause,
details []byte,
request *types.RespondDecisionTaskCompletedRequest,
domainEntry *cache.DomainCacheEntry,
) (execution.MutableState, error) {
// Clear any updates we have accumulated so far
wfContext.Clear()
// Reload workflow execution so we can apply the decision task failure event
mutableState, err := wfContext.LoadWorkflowExecution(ctx)
if err != nil {
return nil, err
}
if _, err = mutableState.AddDecisionTaskFailedEvent(
scheduleID, startedID, cause, details, request.GetIdentity(), "", request.GetBinaryChecksum(), "", "", 0, "",
); err != nil {
return nil, err
}
domainName := domainEntry.GetInfo().Name
maxAttempts := handler.config.DecisionRetryMaxAttempts(domainName)
if maxAttempts > 0 && mutableState.GetExecutionInfo().DecisionAttempt > int64(maxAttempts) {
message := fmt.Sprintf(
"Decision attempt exceeds limit. Last decision failure cause and details: %v - %v",
cause,
details)
executionInfo := mutableState.GetExecutionInfo()
handler.logger.Error(message,
tag.WorkflowDomainID(executionInfo.DomainID),
tag.WorkflowID(executionInfo.WorkflowID),
tag.WorkflowRunID(executionInfo.RunID))
handler.metricsClient.IncCounter(metrics.HistoryRespondDecisionTaskCompletedScope, metrics.DecisionRetriesExceededCounter)
if _, err := mutableState.AddWorkflowExecutionTerminatedEvent(
mutableState.GetNextEventID(),
common.FailureReasonDecisionAttemptsExceedsLimit,
[]byte(message),
execution.IdentityHistoryService,
); err != nil {
return nil, err
}
}
// Return new builder back to the caller for further updates
return mutableState, nil
}
func (handler *handlerImpl) getActiveDomainByID(id string) (*cache.DomainCacheEntry, error) {
return cache.GetActiveDomainByID(handler.shard.GetDomainCache(), handler.shard.GetClusterMetadata().GetCurrentClusterName(), id)
}
func getDecisionInfoAttempt(di *execution.DecisionInfo) int64 {
if di == nil {
return 0
}
return di.Attempt
}
func getDecisionInfoStartedID(di *execution.DecisionInfo) int64 {
if di == nil {
return 0
}
return di.StartedID
}