func()

in service/frontend/api/handler.go [1965:2215]


func (wh *WorkflowHandler) GetWorkflowExecutionHistory(
	ctx context.Context,
	getRequest *types.GetWorkflowExecutionHistoryRequest,
) (resp *types.GetWorkflowExecutionHistoryResponse, retError error) {
	if wh.isShuttingDown() {
		return nil, validate.ErrShuttingDown
	}

	if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
		return nil, err
	}

	if getRequest == nil {
		return nil, validate.ErrRequestNotSet
	}

	domainName := getRequest.GetDomain()
	wfExecution := getRequest.GetExecution()

	if domainName == "" {
		return nil, validate.ErrDomainNotSet
	}

	if err := validate.CheckExecution(wfExecution); err != nil {
		return nil, err
	}

	domainID, err := wh.GetDomainCache().GetDomainID(domainName)
	if err != nil {
		return nil, err
	}

	if getRequest.GetMaximumPageSize() <= 0 {
		getRequest.MaximumPageSize = int32(wh.config.HistoryMaxPageSize(getRequest.GetDomain()))
	}
	// force limit page size if exceed
	if getRequest.GetMaximumPageSize() > common.GetHistoryMaxPageSize {
		wh.GetThrottledLogger().Warn("GetHistory page size is larger than threshold",
			tag.WorkflowID(getRequest.Execution.GetWorkflowID()),
			tag.WorkflowRunID(getRequest.Execution.GetRunID()),
			tag.WorkflowDomainID(domainID),
			tag.WorkflowSize(int64(getRequest.GetMaximumPageSize())))
		getRequest.MaximumPageSize = common.GetHistoryMaxPageSize
	}

	scope := getMetricsScopeWithDomain(metrics.FrontendGetWorkflowExecutionHistoryScope, getRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
	if !getRequest.GetSkipArchival() {
		enableArchivalRead := wh.GetArchivalMetadata().GetHistoryConfig().ReadEnabled()
		historyArchived := wh.historyArchived(ctx, getRequest, domainID)
		if enableArchivalRead && historyArchived {
			return wh.getArchivedHistory(ctx, getRequest, domainID)
		}
	}

	// this function return the following 6 things,
	// 1. branch token
	// 2. the workflow run ID
	// 3. the last first event ID (the event ID of the last batch of events in the history)
	// 4. the next event ID
	// 5. whether the workflow is closed
	// 6. error if any
	queryHistory := func(
		domainUUID string,
		execution *types.WorkflowExecution,
		expectedNextEventID int64,
		currentBranchToken []byte,
	) ([]byte, string, int64, int64, bool, error) {
		response, err := wh.GetHistoryClient().PollMutableState(ctx, &types.PollMutableStateRequest{
			DomainUUID:          domainUUID,
			Execution:           execution,
			ExpectedNextEventID: expectedNextEventID,
			CurrentBranchToken:  currentBranchToken,
		})

		if err != nil {
			return nil, "", 0, 0, false, err
		}
		isWorkflowRunning := response.GetWorkflowCloseState() == persistence.WorkflowCloseStatusNone

		return response.CurrentBranchToken,
			response.Execution.GetRunID(),
			response.GetLastFirstEventID(),
			response.GetNextEventID(),
			isWorkflowRunning,
			nil
	}

	isLongPoll := getRequest.GetWaitForNewEvent()
	isCloseEventOnly := getRequest.GetHistoryEventFilterType() == types.HistoryEventFilterTypeCloseEvent
	execution := getRequest.Execution
	token := &getHistoryContinuationToken{}

	var runID string
	lastFirstEventID := common.FirstEventID
	var nextEventID int64
	var isWorkflowRunning bool

	// process the token for paging
	queryNextEventID := common.EndEventID
	if getRequest.NextPageToken != nil {
		token, err = deserializeHistoryToken(getRequest.NextPageToken)
		if err != nil {
			return nil, validate.ErrInvalidNextPageToken
		}
		if execution.RunID != "" && execution.GetRunID() != token.RunID {
			return nil, validate.ErrNextPageTokenRunIDMismatch
		}

		execution.RunID = token.RunID

		// we need to update the current next event ID and whether workflow is running
		if len(token.PersistenceToken) == 0 && isLongPoll && token.IsWorkflowRunning {
			logger := wh.GetLogger().WithTags(
				tag.WorkflowDomainName(getRequest.GetDomain()),
				tag.WorkflowID(getRequest.Execution.GetWorkflowID()),
				tag.WorkflowRunID(getRequest.Execution.GetRunID()),
			)
			// TODO: for now we only log the invalid timeout (this is done inside the helper function) in case
			// this change breaks existing customers. Once we are sure no one is calling this API with very short timeout
			// we can return the error.
			_ = common.ValidateLongPollContextTimeout(ctx, "GetWorkflowExecutionHistory", logger)

			if !isCloseEventOnly {
				queryNextEventID = token.NextEventID
			}
			token.BranchToken, _, lastFirstEventID, nextEventID, isWorkflowRunning, err =
				queryHistory(domainID, execution, queryNextEventID, token.BranchToken)
			if err != nil {
				return nil, err
			}
			token.FirstEventID = token.NextEventID
			token.NextEventID = nextEventID
			token.IsWorkflowRunning = isWorkflowRunning
		}
	} else {
		if !isCloseEventOnly {
			queryNextEventID = common.FirstEventID
		}
		token.BranchToken, runID, lastFirstEventID, nextEventID, isWorkflowRunning, err =
			queryHistory(domainID, execution, queryNextEventID, nil)
		if err != nil {
			return nil, err
		}

		execution.RunID = runID

		token.RunID = runID
		token.FirstEventID = common.FirstEventID
		token.NextEventID = nextEventID
		token.IsWorkflowRunning = isWorkflowRunning
		token.PersistenceToken = nil
	}

	call := yarpc.CallFromContext(ctx)
	clientFeatureVersion := call.Header(common.FeatureVersionHeaderName)
	clientImpl := call.Header(common.ClientImplHeaderName)
	supportsRawHistoryQuery := wh.versionChecker.SupportsRawHistoryQuery(clientImpl, clientFeatureVersion) == nil
	isRawHistoryEnabled := wh.config.SendRawWorkflowHistory(domainName) && supportsRawHistoryQuery

	history := &types.History{}
	history.Events = []*types.HistoryEvent{}
	var historyBlob []*types.DataBlob

	// helper function to just getHistory
	getHistory := func(firstEventID, nextEventID int64, nextPageToken []byte) error {
		if isRawHistoryEnabled {
			historyBlob, token.PersistenceToken, err = wh.getRawHistory(
				ctx,
				scope,
				domainID,
				domainName,
				*execution,
				firstEventID,
				nextEventID,
				getRequest.GetMaximumPageSize(),
				nextPageToken,
				token.TransientDecision,
				token.BranchToken,
			)
		} else {
			history, token.PersistenceToken, err = wh.getHistory(
				ctx,
				scope,
				domainID,
				domainName,
				*execution,
				firstEventID,
				nextEventID,
				getRequest.GetMaximumPageSize(),
				nextPageToken,
				token.TransientDecision,
				token.BranchToken,
			)
		}
		if err != nil {
			return err
		}
		return nil
	}

	if isCloseEventOnly {
		if !isWorkflowRunning {
			if err := getHistory(lastFirstEventID, nextEventID, nil); err != nil {
				return nil, err
			}
			if isRawHistoryEnabled {
				// since getHistory func will not return empty history, so the below is safe
				historyBlob = historyBlob[len(historyBlob)-1:]
			} else {
				// since getHistory func will not return empty history, so the below is safe
				history.Events = history.Events[len(history.Events)-1:]
			}
			token = nil
		} else if isLongPoll {
			// set the persistence token to be nil so next time we will query history for updates
			token.PersistenceToken = nil
		} else {
			token = nil
		}
	} else {
		// return all events
		if token.FirstEventID >= token.NextEventID {
			// currently there is no new event
			history.Events = []*types.HistoryEvent{}
			if !isWorkflowRunning {
				token = nil
			}
		} else {
			if err := getHistory(token.FirstEventID, token.NextEventID, token.PersistenceToken); err != nil {
				return nil, err
			}
			// here, for long pull on history events, we need to intercept the paging token from cassandra
			// and do something clever
			if len(token.PersistenceToken) == 0 && (!token.IsWorkflowRunning || !isLongPoll) {
				// meaning, there is no more history to be returned
				token = nil
			}
		}
	}

	nextToken, err := serializeHistoryToken(token)
	if err != nil {
		return nil, err
	}
	return &types.GetWorkflowExecutionHistoryResponse{
		History:       history,
		RawHistory:    historyBlob,
		NextPageToken: nextToken,
		Archived:      false,
	}, nil
}