func()

in host/taskpoller.go [117:293]


func (p *TaskPoller) PollAndProcessDecisionTaskWithAttemptAndRetryAndForceNewDecision(
	dumpHistory bool,
	dropTask bool,
	pollStickyTaskList bool,
	respondStickyTaskList bool,
	decisionAttempt int64,
	retryCount int,
	forceCreateNewDecision bool,
	queryResult *types.WorkflowQueryResult,
) (isQueryTask bool, newTask *types.RespondDecisionTaskCompletedResponse, err error) {
Loop:
	for attempt := 0; attempt < retryCount; attempt++ {

		taskList := p.TaskList
		if pollStickyTaskList {
			taskList = p.StickyTaskList
		}
		ctx, cancel := createContext()
		response, err1 := p.Engine.PollForDecisionTask(ctx, &types.PollForDecisionTaskRequest{
			Domain:   p.Domain,
			TaskList: taskList,
			Identity: p.Identity,
		})
		cancel()

		if err1 != nil {
			return false, nil, err1
		}

		if response == nil || len(response.TaskToken) == 0 {
			p.Logger.Info("Empty Decision task: Polling again.")
			continue Loop
		}

		if response.GetNextEventID() == 0 {
			p.Logger.Fatal("NextEventID is not set for decision or query task")
		}

		var events []*types.HistoryEvent
		if response.Query == nil || !pollStickyTaskList {
			// if not query task, should have some history events
			// for non sticky query, there should be events returned
			history := response.History
			if history == nil {
				p.Logger.Fatal("History is nil")
			}

			events = history.Events
			if len(events) == 0 {
				p.Logger.Fatal("History Events are empty")
			}

			nextPageToken := response.NextPageToken
			for nextPageToken != nil {
				ctx, cancel := createContext()
				resp, err2 := p.Engine.GetWorkflowExecutionHistory(ctx, &types.GetWorkflowExecutionHistoryRequest{
					Domain:        p.Domain,
					Execution:     response.WorkflowExecution,
					NextPageToken: nextPageToken,
				})
				cancel()

				if err2 != nil {
					return false, nil, err2
				}

				events = append(events, resp.History.Events...)
				nextPageToken = resp.NextPageToken
			}
		} else {
			// for sticky query, there should be NO events returned
			// since worker side already has the state machine and we do not intend to update that.
			history := response.History
			nextPageToken := response.NextPageToken
			if !(history == nil || (len(history.Events) == 0 && nextPageToken == nil)) {
				// if history is not nil, and contains events or next token
				p.Logger.Fatal("History is not empty for sticky query")
			}
		}

		if dropTask {
			p.Logger.Info("Dropping Decision task: ")
			return false, nil, nil
		}

		if dumpHistory {
			common.PrettyPrintHistory(response.History, p.Logger)
		}

		// handle query task response
		if response.Query != nil {
			blob, err := p.QueryHandler(response)

			completeRequest := &types.RespondQueryTaskCompletedRequest{TaskToken: response.TaskToken}
			if err != nil {
				completeType := types.QueryTaskCompletedTypeFailed
				completeRequest.CompletedType = &completeType
				completeRequest.ErrorMessage = err.Error()
			} else {
				completeType := types.QueryTaskCompletedTypeCompleted
				completeRequest.CompletedType = &completeType
				completeRequest.QueryResult = blob
			}

			ctx, cancel := createContext()
			taskErr := p.Engine.RespondQueryTaskCompleted(ctx, completeRequest)
			cancel()
			return true, nil, taskErr
		}

		// handle normal decision task / non query task response
		var lastDecisionScheduleEvent *types.HistoryEvent
		for _, e := range events {
			if e.GetEventType() == types.EventTypeDecisionTaskScheduled {
				lastDecisionScheduleEvent = e
			}
		}
		if lastDecisionScheduleEvent != nil && decisionAttempt > 0 {
			require.Equal(p.T, decisionAttempt, lastDecisionScheduleEvent.DecisionTaskScheduledEventAttributes.GetAttempt())
		}

		executionCtx, decisions, err := p.DecisionHandler(response.WorkflowExecution, response.WorkflowType,
			common.Int64Default(response.PreviousStartedEventID), response.StartedEventID, response.History)
		if err != nil {
			p.Logger.Info("Failing Decision. Decision handler failed with error", tag.Error(err))
			ctx, cancel := createContext()
			taskErr := p.Engine.RespondDecisionTaskFailed(ctx, &types.RespondDecisionTaskFailedRequest{
				TaskToken: response.TaskToken,
				Cause:     types.DecisionTaskFailedCauseWorkflowWorkerUnhandledFailure.Ptr(),
				Details:   []byte(err.Error()),
				Identity:  p.Identity,
			})
			cancel()
			return isQueryTask, nil, taskErr
		}

		p.Logger.Info("Completing Decision.  Decisions", tag.Value(decisions))
		if !respondStickyTaskList {
			// non sticky tasklist
			ctx, cancel := createContext()
			newTask, err := p.Engine.RespondDecisionTaskCompleted(ctx, &types.RespondDecisionTaskCompletedRequest{
				TaskToken:                  response.TaskToken,
				Identity:                   p.Identity,
				ExecutionContext:           executionCtx,
				Decisions:                  decisions,
				ReturnNewDecisionTask:      forceCreateNewDecision,
				ForceCreateNewDecisionTask: forceCreateNewDecision,
				QueryResults:               getQueryResults(response.GetQueries(), queryResult),
			})
			cancel()
			return false, newTask, err
		}
		// sticky tasklist
		ctx, cancel = createContext()
		newTask, err := p.Engine.RespondDecisionTaskCompleted(
			ctx,
			&types.RespondDecisionTaskCompletedRequest{
				TaskToken:        response.TaskToken,
				Identity:         p.Identity,
				ExecutionContext: executionCtx,
				Decisions:        decisions,
				StickyAttributes: &types.StickyExecutionAttributes{
					WorkerTaskList:                p.StickyTaskList,
					ScheduleToStartTimeoutSeconds: p.StickyScheduleToStartTimeoutSeconds,
				},
				ReturnNewDecisionTask:      forceCreateNewDecision,
				ForceCreateNewDecisionTask: forceCreateNewDecision,
				QueryResults:               getQueryResults(response.GetQueries(), queryResult),
			},
		)
		cancel()

		return false, newTask, err
	}

	return false, nil, matching.ErrNoTasks
}