in service/frontend/api/handler.go [1965:2215]
func (wh *WorkflowHandler) GetWorkflowExecutionHistory(
ctx context.Context,
getRequest *types.GetWorkflowExecutionHistoryRequest,
) (resp *types.GetWorkflowExecutionHistoryResponse, retError error) {
if wh.isShuttingDown() {
return nil, validate.ErrShuttingDown
}
if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
return nil, err
}
if getRequest == nil {
return nil, validate.ErrRequestNotSet
}
domainName := getRequest.GetDomain()
wfExecution := getRequest.GetExecution()
if domainName == "" {
return nil, validate.ErrDomainNotSet
}
if err := validate.CheckExecution(wfExecution); err != nil {
return nil, err
}
domainID, err := wh.GetDomainCache().GetDomainID(domainName)
if err != nil {
return nil, err
}
if getRequest.GetMaximumPageSize() <= 0 {
getRequest.MaximumPageSize = int32(wh.config.HistoryMaxPageSize(getRequest.GetDomain()))
}
// force limit page size if exceed
if getRequest.GetMaximumPageSize() > common.GetHistoryMaxPageSize {
wh.GetThrottledLogger().Warn("GetHistory page size is larger than threshold",
tag.WorkflowID(getRequest.Execution.GetWorkflowID()),
tag.WorkflowRunID(getRequest.Execution.GetRunID()),
tag.WorkflowDomainID(domainID),
tag.WorkflowSize(int64(getRequest.GetMaximumPageSize())))
getRequest.MaximumPageSize = common.GetHistoryMaxPageSize
}
scope := getMetricsScopeWithDomain(metrics.FrontendGetWorkflowExecutionHistoryScope, getRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
if !getRequest.GetSkipArchival() {
enableArchivalRead := wh.GetArchivalMetadata().GetHistoryConfig().ReadEnabled()
historyArchived := wh.historyArchived(ctx, getRequest, domainID)
if enableArchivalRead && historyArchived {
return wh.getArchivedHistory(ctx, getRequest, domainID)
}
}
// this function return the following 6 things,
// 1. branch token
// 2. the workflow run ID
// 3. the last first event ID (the event ID of the last batch of events in the history)
// 4. the next event ID
// 5. whether the workflow is closed
// 6. error if any
queryHistory := func(
domainUUID string,
execution *types.WorkflowExecution,
expectedNextEventID int64,
currentBranchToken []byte,
) ([]byte, string, int64, int64, bool, error) {
response, err := wh.GetHistoryClient().PollMutableState(ctx, &types.PollMutableStateRequest{
DomainUUID: domainUUID,
Execution: execution,
ExpectedNextEventID: expectedNextEventID,
CurrentBranchToken: currentBranchToken,
})
if err != nil {
return nil, "", 0, 0, false, err
}
isWorkflowRunning := response.GetWorkflowCloseState() == persistence.WorkflowCloseStatusNone
return response.CurrentBranchToken,
response.Execution.GetRunID(),
response.GetLastFirstEventID(),
response.GetNextEventID(),
isWorkflowRunning,
nil
}
isLongPoll := getRequest.GetWaitForNewEvent()
isCloseEventOnly := getRequest.GetHistoryEventFilterType() == types.HistoryEventFilterTypeCloseEvent
execution := getRequest.Execution
token := &getHistoryContinuationToken{}
var runID string
lastFirstEventID := common.FirstEventID
var nextEventID int64
var isWorkflowRunning bool
// process the token for paging
queryNextEventID := common.EndEventID
if getRequest.NextPageToken != nil {
token, err = deserializeHistoryToken(getRequest.NextPageToken)
if err != nil {
return nil, validate.ErrInvalidNextPageToken
}
if execution.RunID != "" && execution.GetRunID() != token.RunID {
return nil, validate.ErrNextPageTokenRunIDMismatch
}
execution.RunID = token.RunID
// we need to update the current next event ID and whether workflow is running
if len(token.PersistenceToken) == 0 && isLongPoll && token.IsWorkflowRunning {
logger := wh.GetLogger().WithTags(
tag.WorkflowDomainName(getRequest.GetDomain()),
tag.WorkflowID(getRequest.Execution.GetWorkflowID()),
tag.WorkflowRunID(getRequest.Execution.GetRunID()),
)
// TODO: for now we only log the invalid timeout (this is done inside the helper function) in case
// this change breaks existing customers. Once we are sure no one is calling this API with very short timeout
// we can return the error.
_ = common.ValidateLongPollContextTimeout(ctx, "GetWorkflowExecutionHistory", logger)
if !isCloseEventOnly {
queryNextEventID = token.NextEventID
}
token.BranchToken, _, lastFirstEventID, nextEventID, isWorkflowRunning, err =
queryHistory(domainID, execution, queryNextEventID, token.BranchToken)
if err != nil {
return nil, err
}
token.FirstEventID = token.NextEventID
token.NextEventID = nextEventID
token.IsWorkflowRunning = isWorkflowRunning
}
} else {
if !isCloseEventOnly {
queryNextEventID = common.FirstEventID
}
token.BranchToken, runID, lastFirstEventID, nextEventID, isWorkflowRunning, err =
queryHistory(domainID, execution, queryNextEventID, nil)
if err != nil {
return nil, err
}
execution.RunID = runID
token.RunID = runID
token.FirstEventID = common.FirstEventID
token.NextEventID = nextEventID
token.IsWorkflowRunning = isWorkflowRunning
token.PersistenceToken = nil
}
call := yarpc.CallFromContext(ctx)
clientFeatureVersion := call.Header(common.FeatureVersionHeaderName)
clientImpl := call.Header(common.ClientImplHeaderName)
supportsRawHistoryQuery := wh.versionChecker.SupportsRawHistoryQuery(clientImpl, clientFeatureVersion) == nil
isRawHistoryEnabled := wh.config.SendRawWorkflowHistory(domainName) && supportsRawHistoryQuery
history := &types.History{}
history.Events = []*types.HistoryEvent{}
var historyBlob []*types.DataBlob
// helper function to just getHistory
getHistory := func(firstEventID, nextEventID int64, nextPageToken []byte) error {
if isRawHistoryEnabled {
historyBlob, token.PersistenceToken, err = wh.getRawHistory(
ctx,
scope,
domainID,
domainName,
*execution,
firstEventID,
nextEventID,
getRequest.GetMaximumPageSize(),
nextPageToken,
token.TransientDecision,
token.BranchToken,
)
} else {
history, token.PersistenceToken, err = wh.getHistory(
ctx,
scope,
domainID,
domainName,
*execution,
firstEventID,
nextEventID,
getRequest.GetMaximumPageSize(),
nextPageToken,
token.TransientDecision,
token.BranchToken,
)
}
if err != nil {
return err
}
return nil
}
if isCloseEventOnly {
if !isWorkflowRunning {
if err := getHistory(lastFirstEventID, nextEventID, nil); err != nil {
return nil, err
}
if isRawHistoryEnabled {
// since getHistory func will not return empty history, so the below is safe
historyBlob = historyBlob[len(historyBlob)-1:]
} else {
// since getHistory func will not return empty history, so the below is safe
history.Events = history.Events[len(history.Events)-1:]
}
token = nil
} else if isLongPoll {
// set the persistence token to be nil so next time we will query history for updates
token.PersistenceToken = nil
} else {
token = nil
}
} else {
// return all events
if token.FirstEventID >= token.NextEventID {
// currently there is no new event
history.Events = []*types.HistoryEvent{}
if !isWorkflowRunning {
token = nil
}
} else {
if err := getHistory(token.FirstEventID, token.NextEventID, token.PersistenceToken); err != nil {
return nil, err
}
// here, for long pull on history events, we need to intercept the paging token from cassandra
// and do something clever
if len(token.PersistenceToken) == 0 && (!token.IsWorkflowRunning || !isLongPoll) {
// meaning, there is no more history to be returned
token = nil
}
}
}
nextToken, err := serializeHistoryToken(token)
if err != nil {
return nil, err
}
return &types.GetWorkflowExecutionHistoryResponse{
History: history,
RawHistory: historyBlob,
NextPageToken: nextToken,
Archived: false,
}, nil
}