in host/taskpoller.go [117:293]
func (p *TaskPoller) PollAndProcessDecisionTaskWithAttemptAndRetryAndForceNewDecision(
dumpHistory bool,
dropTask bool,
pollStickyTaskList bool,
respondStickyTaskList bool,
decisionAttempt int64,
retryCount int,
forceCreateNewDecision bool,
queryResult *types.WorkflowQueryResult,
) (isQueryTask bool, newTask *types.RespondDecisionTaskCompletedResponse, err error) {
Loop:
for attempt := 0; attempt < retryCount; attempt++ {
taskList := p.TaskList
if pollStickyTaskList {
taskList = p.StickyTaskList
}
ctx, cancel := createContext()
response, err1 := p.Engine.PollForDecisionTask(ctx, &types.PollForDecisionTaskRequest{
Domain: p.Domain,
TaskList: taskList,
Identity: p.Identity,
})
cancel()
if err1 != nil {
return false, nil, err1
}
if response == nil || len(response.TaskToken) == 0 {
p.Logger.Info("Empty Decision task: Polling again.")
continue Loop
}
if response.GetNextEventID() == 0 {
p.Logger.Fatal("NextEventID is not set for decision or query task")
}
var events []*types.HistoryEvent
if response.Query == nil || !pollStickyTaskList {
// if not query task, should have some history events
// for non sticky query, there should be events returned
history := response.History
if history == nil {
p.Logger.Fatal("History is nil")
}
events = history.Events
if len(events) == 0 {
p.Logger.Fatal("History Events are empty")
}
nextPageToken := response.NextPageToken
for nextPageToken != nil {
ctx, cancel := createContext()
resp, err2 := p.Engine.GetWorkflowExecutionHistory(ctx, &types.GetWorkflowExecutionHistoryRequest{
Domain: p.Domain,
Execution: response.WorkflowExecution,
NextPageToken: nextPageToken,
})
cancel()
if err2 != nil {
return false, nil, err2
}
events = append(events, resp.History.Events...)
nextPageToken = resp.NextPageToken
}
} else {
// for sticky query, there should be NO events returned
// since worker side already has the state machine and we do not intend to update that.
history := response.History
nextPageToken := response.NextPageToken
if !(history == nil || (len(history.Events) == 0 && nextPageToken == nil)) {
// if history is not nil, and contains events or next token
p.Logger.Fatal("History is not empty for sticky query")
}
}
if dropTask {
p.Logger.Info("Dropping Decision task: ")
return false, nil, nil
}
if dumpHistory {
common.PrettyPrintHistory(response.History, p.Logger)
}
// handle query task response
if response.Query != nil {
blob, err := p.QueryHandler(response)
completeRequest := &types.RespondQueryTaskCompletedRequest{TaskToken: response.TaskToken}
if err != nil {
completeType := types.QueryTaskCompletedTypeFailed
completeRequest.CompletedType = &completeType
completeRequest.ErrorMessage = err.Error()
} else {
completeType := types.QueryTaskCompletedTypeCompleted
completeRequest.CompletedType = &completeType
completeRequest.QueryResult = blob
}
ctx, cancel := createContext()
taskErr := p.Engine.RespondQueryTaskCompleted(ctx, completeRequest)
cancel()
return true, nil, taskErr
}
// handle normal decision task / non query task response
var lastDecisionScheduleEvent *types.HistoryEvent
for _, e := range events {
if e.GetEventType() == types.EventTypeDecisionTaskScheduled {
lastDecisionScheduleEvent = e
}
}
if lastDecisionScheduleEvent != nil && decisionAttempt > 0 {
require.Equal(p.T, decisionAttempt, lastDecisionScheduleEvent.DecisionTaskScheduledEventAttributes.GetAttempt())
}
executionCtx, decisions, err := p.DecisionHandler(response.WorkflowExecution, response.WorkflowType,
common.Int64Default(response.PreviousStartedEventID), response.StartedEventID, response.History)
if err != nil {
p.Logger.Info("Failing Decision. Decision handler failed with error", tag.Error(err))
ctx, cancel := createContext()
taskErr := p.Engine.RespondDecisionTaskFailed(ctx, &types.RespondDecisionTaskFailedRequest{
TaskToken: response.TaskToken,
Cause: types.DecisionTaskFailedCauseWorkflowWorkerUnhandledFailure.Ptr(),
Details: []byte(err.Error()),
Identity: p.Identity,
})
cancel()
return isQueryTask, nil, taskErr
}
p.Logger.Info("Completing Decision. Decisions", tag.Value(decisions))
if !respondStickyTaskList {
// non sticky tasklist
ctx, cancel := createContext()
newTask, err := p.Engine.RespondDecisionTaskCompleted(ctx, &types.RespondDecisionTaskCompletedRequest{
TaskToken: response.TaskToken,
Identity: p.Identity,
ExecutionContext: executionCtx,
Decisions: decisions,
ReturnNewDecisionTask: forceCreateNewDecision,
ForceCreateNewDecisionTask: forceCreateNewDecision,
QueryResults: getQueryResults(response.GetQueries(), queryResult),
})
cancel()
return false, newTask, err
}
// sticky tasklist
ctx, cancel = createContext()
newTask, err := p.Engine.RespondDecisionTaskCompleted(
ctx,
&types.RespondDecisionTaskCompletedRequest{
TaskToken: response.TaskToken,
Identity: p.Identity,
ExecutionContext: executionCtx,
Decisions: decisions,
StickyAttributes: &types.StickyExecutionAttributes{
WorkerTaskList: p.StickyTaskList,
ScheduleToStartTimeoutSeconds: p.StickyScheduleToStartTimeoutSeconds,
},
ReturnNewDecisionTask: forceCreateNewDecision,
ForceCreateNewDecisionTask: forceCreateNewDecision,
QueryResults: getQueryResults(response.GetQueries(), queryResult),
},
)
cancel()
return false, newTask, err
}
return false, nil, matching.ErrNoTasks
}