service/history/execution/history_builder.go (693 lines of code) (raw):
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package execution
import (
"time"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/types"
)
type (
// HistoryBuilder builds and stores workflow history events
HistoryBuilder struct {
transientHistory []*types.HistoryEvent
history []*types.HistoryEvent
msBuilder MutableState
}
)
// NewHistoryBuilder creates a new history builder
func NewHistoryBuilder(msBuilder MutableState) *HistoryBuilder {
return &HistoryBuilder{
transientHistory: []*types.HistoryEvent{},
history: []*types.HistoryEvent{},
msBuilder: msBuilder,
}
}
// NewHistoryBuilderFromEvents creates a new history builder based on the given workflow history events
func NewHistoryBuilderFromEvents(history []*types.HistoryEvent) *HistoryBuilder {
return &HistoryBuilder{
history: history,
}
}
// AddWorkflowExecutionStartedEvent adds WorkflowExecutionStarted event to history
// originalRunID is the runID when the WorkflowExecutionStarted event is written
// firstRunID is the very first runID along the chain of ContinueAsNew and Reset
func (b *HistoryBuilder) AddWorkflowExecutionStartedEvent(startRequest *types.HistoryStartWorkflowExecutionRequest,
previousExecution *persistence.WorkflowExecutionInfo, firstRunID string, originalRunID string, firstScheduledTime time.Time) *types.HistoryEvent {
var prevRunID string
var resetPoints *types.ResetPoints
if previousExecution != nil {
prevRunID = previousExecution.RunID
resetPoints = previousExecution.AutoResetPoints
}
request := startRequest.StartRequest
event := b.msBuilder.CreateNewHistoryEvent(types.EventTypeWorkflowExecutionStarted)
var scheduledTime *time.Time
if request.CronSchedule != "" {
// first scheduled time is only necessary for cron workflows.
scheduledTime = &firstScheduledTime
}
attributes := &types.WorkflowExecutionStartedEventAttributes{
WorkflowType: request.WorkflowType,
TaskList: request.TaskList,
Header: request.Header,
Input: request.Input,
ExecutionStartToCloseTimeoutSeconds: request.ExecutionStartToCloseTimeoutSeconds,
TaskStartToCloseTimeoutSeconds: request.TaskStartToCloseTimeoutSeconds,
ContinuedExecutionRunID: prevRunID,
PrevAutoResetPoints: resetPoints,
Identity: request.Identity,
RetryPolicy: request.RetryPolicy,
Attempt: startRequest.GetAttempt(),
ExpirationTimestamp: startRequest.ExpirationTimestamp,
CronSchedule: request.CronSchedule,
LastCompletionResult: startRequest.LastCompletionResult,
ContinuedFailureReason: startRequest.ContinuedFailureReason,
ContinuedFailureDetails: startRequest.ContinuedFailureDetails,
Initiator: startRequest.ContinueAsNewInitiator,
FirstDecisionTaskBackoffSeconds: startRequest.FirstDecisionTaskBackoffSeconds,
FirstExecutionRunID: firstRunID,
FirstScheduleTime: scheduledTime,
OriginalExecutionRunID: originalRunID,
Memo: request.Memo,
SearchAttributes: request.SearchAttributes,
JitterStartSeconds: request.JitterStartSeconds,
PartitionConfig: startRequest.PartitionConfig,
RequestID: request.RequestID,
}
if parentInfo := startRequest.ParentExecutionInfo; parentInfo != nil {
attributes.ParentWorkflowDomainID = &parentInfo.DomainUUID
attributes.ParentWorkflowDomain = &parentInfo.Domain
attributes.ParentWorkflowExecution = parentInfo.Execution
attributes.ParentInitiatedEventID = &parentInfo.InitiatedID
}
event.WorkflowExecutionStartedEventAttributes = attributes
return b.addEventToHistory(event)
}
// AddDecisionTaskScheduledEvent adds DecisionTaskScheduled event to history
func (b *HistoryBuilder) AddDecisionTaskScheduledEvent(taskList string,
startToCloseTimeoutSeconds int32, attempt int64) *types.HistoryEvent {
event := b.msBuilder.CreateNewHistoryEvent(types.EventTypeDecisionTaskScheduled)
event.DecisionTaskScheduledEventAttributes = getDecisionTaskScheduledEventAttributes(taskList, startToCloseTimeoutSeconds, attempt)
return b.addEventToHistory(event)
}
// AddTransientDecisionTaskScheduledEvent adds transient DecisionTaskScheduled event
func (b *HistoryBuilder) AddTransientDecisionTaskScheduledEvent(taskList string,
startToCloseTimeoutSeconds int32, attempt int64, timestamp int64) *types.HistoryEvent {
event := b.msBuilder.CreateNewHistoryEventWithTimestamp(types.EventTypeDecisionTaskScheduled, timestamp)
event.DecisionTaskScheduledEventAttributes = getDecisionTaskScheduledEventAttributes(taskList, startToCloseTimeoutSeconds, attempt)
return b.addTransientEvent(event)
}
// AddDecisionTaskStartedEvent adds DecisionTaskStarted event to history
func (b *HistoryBuilder) AddDecisionTaskStartedEvent(scheduleEventID int64, requestID string,
identity string) *types.HistoryEvent {
event := b.msBuilder.CreateNewHistoryEvent(types.EventTypeDecisionTaskStarted)
event.DecisionTaskStartedEventAttributes = getDecisionTaskStartedEventAttributes(scheduleEventID, requestID, identity)
return b.addEventToHistory(event)
}
// AddTransientDecisionTaskStartedEvent adds transient DecisionTaskStarted event
func (b *HistoryBuilder) AddTransientDecisionTaskStartedEvent(scheduleEventID int64, requestID string,
identity string, timestamp int64) *types.HistoryEvent {
event := b.msBuilder.CreateNewHistoryEventWithTimestamp(types.EventTypeDecisionTaskStarted, timestamp)
event.DecisionTaskStartedEventAttributes = getDecisionTaskStartedEventAttributes(scheduleEventID, requestID, identity)
return b.addTransientEvent(event)
}
// AddDecisionTaskCompletedEvent adds DecisionTaskCompleted event to history
func (b *HistoryBuilder) AddDecisionTaskCompletedEvent(scheduleEventID, StartedEventID int64,
request *types.RespondDecisionTaskCompletedRequest) *types.HistoryEvent {
event := b.msBuilder.CreateNewHistoryEvent(types.EventTypeDecisionTaskCompleted)
event.DecisionTaskCompletedEventAttributes = &types.DecisionTaskCompletedEventAttributes{
ExecutionContext: request.ExecutionContext,
ScheduledEventID: scheduleEventID,
StartedEventID: StartedEventID,
Identity: request.Identity,
BinaryChecksum: request.BinaryChecksum,
}
return b.addEventToHistory(event)
}
// AddDecisionTaskTimedOutEvent adds DecisionTaskTimedOut event to history
func (b *HistoryBuilder) AddDecisionTaskTimedOutEvent(
scheduleEventID int64,
startedEventID int64,
timeoutType types.TimeoutType,
baseRunID string,
newRunID string,
forkEventVersion int64,
reason string,
cause types.DecisionTaskTimedOutCause,
resetRequestID string,
) *types.HistoryEvent {
event := b.msBuilder.CreateNewHistoryEvent(types.EventTypeDecisionTaskTimedOut)
event.DecisionTaskTimedOutEventAttributes = &types.DecisionTaskTimedOutEventAttributes{
ScheduledEventID: scheduleEventID,
StartedEventID: startedEventID,
TimeoutType: timeoutType.Ptr(),
BaseRunID: baseRunID,
NewRunID: newRunID,
ForkEventVersion: forkEventVersion,
Reason: reason,
Cause: cause.Ptr(),
RequestID: resetRequestID,
}
return b.addEventToHistory(event)
}
// AddDecisionTaskFailedEvent adds DecisionTaskFailed event to history
func (b *HistoryBuilder) AddDecisionTaskFailedEvent(attr types.DecisionTaskFailedEventAttributes) *types.HistoryEvent {
event := b.msBuilder.CreateNewHistoryEvent(types.EventTypeDecisionTaskFailed)
event.DecisionTaskFailedEventAttributes = &attr
return b.addEventToHistory(event)
}
// AddActivityTaskScheduledEvent adds ActivityTaskScheduled event to history
func (b *HistoryBuilder) AddActivityTaskScheduledEvent(decisionCompletedEventID int64,
attributes *types.ScheduleActivityTaskDecisionAttributes) *types.HistoryEvent {
var domain *string
if attributes.Domain != "" {
// for backward compatibility
// old releases will encounter issues if Domain field is a pointer to an empty string.
domain = common.StringPtr(attributes.Domain)
}
event := b.msBuilder.CreateNewHistoryEvent(types.EventTypeActivityTaskScheduled)
event.ActivityTaskScheduledEventAttributes = &types.ActivityTaskScheduledEventAttributes{
ActivityID: attributes.ActivityID,
ActivityType: attributes.ActivityType,
TaskList: attributes.TaskList,
Header: attributes.Header,
Input: attributes.Input,
ScheduleToCloseTimeoutSeconds: common.Int32Ptr(common.Int32Default(attributes.ScheduleToCloseTimeoutSeconds)),
ScheduleToStartTimeoutSeconds: common.Int32Ptr(common.Int32Default(attributes.ScheduleToStartTimeoutSeconds)),
StartToCloseTimeoutSeconds: common.Int32Ptr(common.Int32Default(attributes.StartToCloseTimeoutSeconds)),
HeartbeatTimeoutSeconds: common.Int32Ptr(common.Int32Default(attributes.HeartbeatTimeoutSeconds)),
DecisionTaskCompletedEventID: decisionCompletedEventID,
RetryPolicy: attributes.RetryPolicy,
Domain: domain,
}
return b.addEventToHistory(event)
}
// AddActivityTaskStartedEvent adds ActivityTaskStarted event to history
func (b *HistoryBuilder) AddActivityTaskStartedEvent(
scheduleEventID int64,
attempt int32,
requestID string,
identity string,
lastFailureReason string,
lastFailureDetails []byte,
) *types.HistoryEvent {
event := b.msBuilder.CreateNewHistoryEvent(types.EventTypeActivityTaskStarted)
event.ActivityTaskStartedEventAttributes = &types.ActivityTaskStartedEventAttributes{
ScheduledEventID: scheduleEventID,
Attempt: attempt,
Identity: identity,
RequestID: requestID,
LastFailureReason: common.StringPtr(lastFailureReason),
LastFailureDetails: lastFailureDetails,
}
return b.addEventToHistory(event)
}
// AddActivityTaskCompletedEvent adds ActivityTaskCompleted event to history
func (b *HistoryBuilder) AddActivityTaskCompletedEvent(scheduleEventID, startedEventID int64,
request *types.RespondActivityTaskCompletedRequest) *types.HistoryEvent {
event := b.msBuilder.CreateNewHistoryEvent(types.EventTypeActivityTaskCompleted)
event.ActivityTaskCompletedEventAttributes = &types.ActivityTaskCompletedEventAttributes{
Result: request.Result,
ScheduledEventID: scheduleEventID,
StartedEventID: startedEventID,
Identity: request.Identity,
}
return b.addEventToHistory(event)
}
// AddActivityTaskFailedEvent adds ActivityTaskFailed event to history
func (b *HistoryBuilder) AddActivityTaskFailedEvent(scheduleEventID, StartedEventID int64,
request *types.RespondActivityTaskFailedRequest) *types.HistoryEvent {
event := b.msBuilder.CreateNewHistoryEvent(types.EventTypeActivityTaskFailed)
event.ActivityTaskFailedEventAttributes = &types.ActivityTaskFailedEventAttributes{
Reason: common.StringPtr(common.StringDefault(request.Reason)),
Details: request.Details,
ScheduledEventID: scheduleEventID,
StartedEventID: StartedEventID,
Identity: request.Identity,
}
return b.addEventToHistory(event)
}
// AddActivityTaskTimedOutEvent adds ActivityTaskTimedOut event to history
func (b *HistoryBuilder) AddActivityTaskTimedOutEvent(
scheduleEventID,
startedEventID int64,
timeoutType types.TimeoutType,
lastHeartBeatDetails []byte,
lastFailureReason string,
lastFailureDetail []byte,
) *types.HistoryEvent {
event := b.msBuilder.CreateNewHistoryEvent(types.EventTypeActivityTaskTimedOut)
event.ActivityTaskTimedOutEventAttributes = &types.ActivityTaskTimedOutEventAttributes{
ScheduledEventID: scheduleEventID,
StartedEventID: startedEventID,
TimeoutType: &timeoutType,
Details: lastHeartBeatDetails,
LastFailureReason: common.StringPtr(lastFailureReason),
LastFailureDetails: lastFailureDetail,
}
return b.addEventToHistory(event)
}
// AddCompletedWorkflowEvent adds WorkflowExecutionCompleted event to history
func (b *HistoryBuilder) AddCompletedWorkflowEvent(decisionCompletedEventID int64,
attributes *types.CompleteWorkflowExecutionDecisionAttributes) *types.HistoryEvent {
event := b.msBuilder.CreateNewHistoryEvent(types.EventTypeWorkflowExecutionCompleted)
event.WorkflowExecutionCompletedEventAttributes = &types.WorkflowExecutionCompletedEventAttributes{
Result: attributes.Result,
DecisionTaskCompletedEventID: decisionCompletedEventID,
}
return b.addEventToHistory(event)
}
// AddFailWorkflowEvent adds WorkflowExecutionFailed event to history
func (b *HistoryBuilder) AddFailWorkflowEvent(decisionCompletedEventID int64,
attributes *types.FailWorkflowExecutionDecisionAttributes) *types.HistoryEvent {
event := b.msBuilder.CreateNewHistoryEvent(types.EventTypeWorkflowExecutionFailed)
event.WorkflowExecutionFailedEventAttributes = &types.WorkflowExecutionFailedEventAttributes{
Reason: common.StringPtr(common.StringDefault(attributes.Reason)),
Details: attributes.Details,
DecisionTaskCompletedEventID: decisionCompletedEventID,
}
return b.addEventToHistory(event)
}
// AddTimeoutWorkflowEvent adds WorkflowExecutionTimedout event to history
func (b *HistoryBuilder) AddTimeoutWorkflowEvent() *types.HistoryEvent {
event := b.msBuilder.CreateNewHistoryEvent(types.EventTypeWorkflowExecutionTimedOut)
event.WorkflowExecutionTimedOutEventAttributes = &types.WorkflowExecutionTimedOutEventAttributes{
TimeoutType: types.TimeoutTypeStartToClose.Ptr(),
}
return b.addEventToHistory(event)
}
// AddWorkflowExecutionTerminatedEvent add WorkflowExecutionTerminated event to history
func (b *HistoryBuilder) AddWorkflowExecutionTerminatedEvent(
reason string,
details []byte,
identity string,
) *types.HistoryEvent {
event := b.msBuilder.CreateNewHistoryEvent(types.EventTypeWorkflowExecutionTerminated)
event.WorkflowExecutionTerminatedEventAttributes = &types.WorkflowExecutionTerminatedEventAttributes{
Reason: reason,
Details: details,
Identity: identity,
}
return b.addEventToHistory(event)
}
// AddContinuedAsNewEvent adds WorkflowExecutionContinuedAsNew event to history
func (b *HistoryBuilder) AddContinuedAsNewEvent(decisionCompletedEventID int64, newRunID string,
attributes *types.ContinueAsNewWorkflowExecutionDecisionAttributes) *types.HistoryEvent {
initiator := attributes.Initiator
if initiator == nil {
initiator = types.ContinueAsNewInitiatorDecider.Ptr()
}
event := b.msBuilder.CreateNewHistoryEvent(types.EventTypeWorkflowExecutionContinuedAsNew)
event.WorkflowExecutionContinuedAsNewEventAttributes = &types.WorkflowExecutionContinuedAsNewEventAttributes{
NewExecutionRunID: newRunID,
WorkflowType: attributes.WorkflowType,
TaskList: attributes.TaskList,
Header: attributes.Header,
Input: attributes.Input,
ExecutionStartToCloseTimeoutSeconds: attributes.ExecutionStartToCloseTimeoutSeconds,
TaskStartToCloseTimeoutSeconds: attributes.TaskStartToCloseTimeoutSeconds,
DecisionTaskCompletedEventID: decisionCompletedEventID,
BackoffStartIntervalInSeconds: common.Int32Ptr(attributes.GetBackoffStartIntervalInSeconds()),
Initiator: initiator,
FailureReason: attributes.FailureReason,
FailureDetails: attributes.FailureDetails,
LastCompletionResult: attributes.LastCompletionResult,
Memo: attributes.Memo,
SearchAttributes: attributes.SearchAttributes,
JitterStartSeconds: attributes.JitterStartSeconds,
}
return b.addEventToHistory(event)
}
// AddTimerStartedEvent adds TimerStart event to history
func (b *HistoryBuilder) AddTimerStartedEvent(decisionCompletedEventID int64,
request *types.StartTimerDecisionAttributes) *types.HistoryEvent {
event := b.msBuilder.CreateNewHistoryEvent(types.EventTypeTimerStarted)
event.TimerStartedEventAttributes = &types.TimerStartedEventAttributes{
TimerID: request.TimerID,
StartToFireTimeoutSeconds: request.StartToFireTimeoutSeconds,
DecisionTaskCompletedEventID: decisionCompletedEventID,
}
return b.addEventToHistory(event)
}
// AddTimerFiredEvent adds TimerFired event to history
func (b *HistoryBuilder) AddTimerFiredEvent(
StartedEventID int64,
TimerID string,
) *types.HistoryEvent {
event := b.msBuilder.CreateNewHistoryEvent(types.EventTypeTimerFired)
event.TimerFiredEventAttributes = &types.TimerFiredEventAttributes{
TimerID: TimerID,
StartedEventID: StartedEventID,
}
return b.addEventToHistory(event)
}
// AddActivityTaskCancelRequestedEvent add ActivityTaskCancelRequested event to history
func (b *HistoryBuilder) AddActivityTaskCancelRequestedEvent(decisionCompletedEventID int64,
activityID string) *types.HistoryEvent {
event := b.msBuilder.CreateNewHistoryEvent(types.EventTypeActivityTaskCancelRequested)
event.ActivityTaskCancelRequestedEventAttributes = &types.ActivityTaskCancelRequestedEventAttributes{
ActivityID: activityID,
DecisionTaskCompletedEventID: decisionCompletedEventID,
}
return b.addEventToHistory(event)
}
// AddRequestCancelActivityTaskFailedEvent add RequestCancelActivityTaskFailed event to history
func (b *HistoryBuilder) AddRequestCancelActivityTaskFailedEvent(decisionCompletedEventID int64,
activityID string, cause string) *types.HistoryEvent {
event := b.msBuilder.CreateNewHistoryEvent(types.EventTypeRequestCancelActivityTaskFailed)
event.RequestCancelActivityTaskFailedEventAttributes = &types.RequestCancelActivityTaskFailedEventAttributes{
ActivityID: activityID,
DecisionTaskCompletedEventID: decisionCompletedEventID,
Cause: cause,
}
return b.addEventToHistory(event)
}
// AddActivityTaskCanceledEvent adds ActivityTaskCanceled event to history
func (b *HistoryBuilder) AddActivityTaskCanceledEvent(scheduleEventID, startedEventID int64,
latestCancelRequestedEventID int64, details []byte, identity string) *types.HistoryEvent {
event := b.msBuilder.CreateNewHistoryEvent(types.EventTypeActivityTaskCanceled)
event.ActivityTaskCanceledEventAttributes = &types.ActivityTaskCanceledEventAttributes{
ScheduledEventID: scheduleEventID,
StartedEventID: startedEventID,
LatestCancelRequestedEventID: latestCancelRequestedEventID,
Details: details,
Identity: identity,
}
return b.addEventToHistory(event)
}
// AddTimerCanceledEvent adds TimerCanceled event to history
func (b *HistoryBuilder) AddTimerCanceledEvent(startedEventID int64,
decisionTaskCompletedEventID int64, timerID string, identity string) *types.HistoryEvent {
event := b.msBuilder.CreateNewHistoryEvent(types.EventTypeTimerCanceled)
event.TimerCanceledEventAttributes = &types.TimerCanceledEventAttributes{
StartedEventID: startedEventID,
DecisionTaskCompletedEventID: decisionTaskCompletedEventID,
TimerID: timerID,
Identity: identity,
}
return b.addEventToHistory(event)
}
// AddCancelTimerFailedEvent adds CancelTimerFailed event to history
func (b *HistoryBuilder) AddCancelTimerFailedEvent(timerID string, decisionTaskCompletedEventID int64,
cause string, identity string) *types.HistoryEvent {
event := b.msBuilder.CreateNewHistoryEvent(types.EventTypeCancelTimerFailed)
event.CancelTimerFailedEventAttributes = &types.CancelTimerFailedEventAttributes{
TimerID: timerID,
DecisionTaskCompletedEventID: decisionTaskCompletedEventID,
Cause: cause,
Identity: identity,
}
return b.addEventToHistory(event)
}
// AddWorkflowExecutionCancelRequestedEvent adds WorkflowExecutionCancelRequested event to history
func (b *HistoryBuilder) AddWorkflowExecutionCancelRequestedEvent(
cause string,
request *types.HistoryRequestCancelWorkflowExecutionRequest,
) *types.HistoryEvent {
event := b.msBuilder.CreateNewHistoryEvent(types.EventTypeWorkflowExecutionCancelRequested)
event.WorkflowExecutionCancelRequestedEventAttributes = &types.WorkflowExecutionCancelRequestedEventAttributes{
Cause: cause,
Identity: request.CancelRequest.Identity,
ExternalInitiatedEventID: request.ExternalInitiatedEventID,
ExternalWorkflowExecution: request.ExternalWorkflowExecution,
RequestID: request.CancelRequest.RequestID,
}
return b.addEventToHistory(event)
}
// AddWorkflowExecutionCanceledEvent adds WorkflowExecutionCanceled event to history
func (b *HistoryBuilder) AddWorkflowExecutionCanceledEvent(decisionTaskCompletedEventID int64,
attributes *types.CancelWorkflowExecutionDecisionAttributes) *types.HistoryEvent {
event := b.msBuilder.CreateNewHistoryEvent(types.EventTypeWorkflowExecutionCanceled)
event.WorkflowExecutionCanceledEventAttributes = &types.WorkflowExecutionCanceledEventAttributes{
DecisionTaskCompletedEventID: decisionTaskCompletedEventID,
Details: attributes.Details,
}
return b.addEventToHistory(event)
}
// AddRequestCancelExternalWorkflowExecutionInitiatedEvent adds RequestCancelExternalWorkflowExecutionInitiated event to history
func (b *HistoryBuilder) AddRequestCancelExternalWorkflowExecutionInitiatedEvent(decisionTaskCompletedEventID int64,
request *types.RequestCancelExternalWorkflowExecutionDecisionAttributes) *types.HistoryEvent {
event := b.msBuilder.CreateNewHistoryEvent(types.EventTypeRequestCancelExternalWorkflowExecutionInitiated)
event.RequestCancelExternalWorkflowExecutionInitiatedEventAttributes = &types.RequestCancelExternalWorkflowExecutionInitiatedEventAttributes{
DecisionTaskCompletedEventID: decisionTaskCompletedEventID,
Domain: request.Domain,
WorkflowExecution: &types.WorkflowExecution{
WorkflowID: request.WorkflowID,
RunID: request.RunID,
},
Control: request.Control,
ChildWorkflowOnly: request.ChildWorkflowOnly,
}
return b.addEventToHistory(event)
}
// AddRequestCancelExternalWorkflowExecutionFailedEvent adds RequestCancelExternalWorkflowExecutionFailed event to history
func (b *HistoryBuilder) AddRequestCancelExternalWorkflowExecutionFailedEvent(decisionTaskCompletedEventID, initiatedEventID int64,
domain, workflowID, runID string, cause types.CancelExternalWorkflowExecutionFailedCause) *types.HistoryEvent {
event := b.msBuilder.CreateNewHistoryEvent(types.EventTypeRequestCancelExternalWorkflowExecutionFailed)
event.RequestCancelExternalWorkflowExecutionFailedEventAttributes = &types.RequestCancelExternalWorkflowExecutionFailedEventAttributes{
DecisionTaskCompletedEventID: decisionTaskCompletedEventID,
InitiatedEventID: initiatedEventID,
Domain: domain,
WorkflowExecution: &types.WorkflowExecution{
WorkflowID: workflowID,
RunID: runID,
},
Cause: cause.Ptr(),
}
return b.addEventToHistory(event)
}
// AddExternalWorkflowExecutionCancelRequested adds ExternalWorkflowExecutionCancelRequested event to history
func (b *HistoryBuilder) AddExternalWorkflowExecutionCancelRequested(initiatedEventID int64,
domain, workflowID, runID string) *types.HistoryEvent {
event := b.msBuilder.CreateNewHistoryEvent(types.EventTypeExternalWorkflowExecutionCancelRequested)
event.ExternalWorkflowExecutionCancelRequestedEventAttributes = &types.ExternalWorkflowExecutionCancelRequestedEventAttributes{
InitiatedEventID: initiatedEventID,
Domain: domain,
WorkflowExecution: &types.WorkflowExecution{
WorkflowID: workflowID,
RunID: runID,
},
}
return b.addEventToHistory(event)
}
// AddSignalExternalWorkflowExecutionInitiatedEvent adds SignalExternalWorkflowExecutionInitiated event to history
func (b *HistoryBuilder) AddSignalExternalWorkflowExecutionInitiatedEvent(decisionTaskCompletedEventID int64,
attributes *types.SignalExternalWorkflowExecutionDecisionAttributes) *types.HistoryEvent {
event := b.msBuilder.CreateNewHistoryEvent(types.EventTypeSignalExternalWorkflowExecutionInitiated)
event.SignalExternalWorkflowExecutionInitiatedEventAttributes = &types.SignalExternalWorkflowExecutionInitiatedEventAttributes{
DecisionTaskCompletedEventID: decisionTaskCompletedEventID,
Domain: attributes.Domain,
WorkflowExecution: &types.WorkflowExecution{
WorkflowID: attributes.Execution.WorkflowID,
RunID: attributes.Execution.RunID,
},
SignalName: attributes.GetSignalName(),
Input: attributes.Input,
Control: attributes.Control,
ChildWorkflowOnly: attributes.ChildWorkflowOnly,
}
return b.addEventToHistory(event)
}
// AddUpsertWorkflowSearchAttributesEvent adds UpsertWorkflowSearchAttributes event to history
func (b *HistoryBuilder) AddUpsertWorkflowSearchAttributesEvent(
decisionTaskCompletedEventID int64,
attributes *types.UpsertWorkflowSearchAttributesDecisionAttributes) *types.HistoryEvent {
event := b.msBuilder.CreateNewHistoryEvent(types.EventTypeUpsertWorkflowSearchAttributes)
event.UpsertWorkflowSearchAttributesEventAttributes = &types.UpsertWorkflowSearchAttributesEventAttributes{
DecisionTaskCompletedEventID: decisionTaskCompletedEventID,
SearchAttributes: attributes.GetSearchAttributes(),
}
return b.addEventToHistory(event)
}
// AddSignalExternalWorkflowExecutionFailedEvent adds SignalExternalWorkflowExecutionFailed event to history
func (b *HistoryBuilder) AddSignalExternalWorkflowExecutionFailedEvent(decisionTaskCompletedEventID, initiatedEventID int64,
domain, workflowID, runID string, control []byte, cause types.SignalExternalWorkflowExecutionFailedCause) *types.HistoryEvent {
event := b.msBuilder.CreateNewHistoryEvent(types.EventTypeSignalExternalWorkflowExecutionFailed)
event.SignalExternalWorkflowExecutionFailedEventAttributes = &types.SignalExternalWorkflowExecutionFailedEventAttributes{
DecisionTaskCompletedEventID: decisionTaskCompletedEventID,
InitiatedEventID: initiatedEventID,
Domain: domain,
WorkflowExecution: &types.WorkflowExecution{
WorkflowID: workflowID,
RunID: runID,
},
Cause: cause.Ptr(),
Control: control,
}
return b.addEventToHistory(event)
}
// AddExternalWorkflowExecutionSignaled adds ExternalWorkflowExecutionSignaled event to history
func (b *HistoryBuilder) AddExternalWorkflowExecutionSignaled(initiatedEventID int64,
domain, workflowID, runID string, control []byte) *types.HistoryEvent {
event := b.msBuilder.CreateNewHistoryEvent(types.EventTypeExternalWorkflowExecutionSignaled)
event.ExternalWorkflowExecutionSignaledEventAttributes = &types.ExternalWorkflowExecutionSignaledEventAttributes{
InitiatedEventID: initiatedEventID,
Domain: domain,
WorkflowExecution: &types.WorkflowExecution{
WorkflowID: workflowID,
RunID: runID,
},
Control: control,
}
return b.addEventToHistory(event)
}
// AddMarkerRecordedEvent adds MarkerRecorded event to history
func (b *HistoryBuilder) AddMarkerRecordedEvent(decisionCompletedEventID int64,
attributes *types.RecordMarkerDecisionAttributes) *types.HistoryEvent {
event := b.msBuilder.CreateNewHistoryEvent(types.EventTypeMarkerRecorded)
event.MarkerRecordedEventAttributes = &types.MarkerRecordedEventAttributes{
MarkerName: attributes.MarkerName,
Details: attributes.Details,
DecisionTaskCompletedEventID: decisionCompletedEventID,
Header: attributes.Header,
}
return b.addEventToHistory(event)
}
// AddWorkflowExecutionSignaledEvent adds WorkflowExecutionSignaled event to history
func (b *HistoryBuilder) AddWorkflowExecutionSignaledEvent(
signalName string,
input []byte,
identity string,
requestID string,
) *types.HistoryEvent {
event := b.msBuilder.CreateNewHistoryEvent(types.EventTypeWorkflowExecutionSignaled)
event.WorkflowExecutionSignaledEventAttributes = &types.WorkflowExecutionSignaledEventAttributes{
SignalName: signalName,
Input: input,
Identity: identity,
RequestID: requestID,
}
return b.addEventToHistory(event)
}
// AddStartChildWorkflowExecutionInitiatedEvent adds ChildWorkflowExecutionInitiated event to history
func (b *HistoryBuilder) AddStartChildWorkflowExecutionInitiatedEvent(
decisionCompletedEventID int64,
attributes *types.StartChildWorkflowExecutionDecisionAttributes,
targetDomainName string,
) *types.HistoryEvent {
domain := attributes.Domain
if domain == "" {
domain = targetDomainName
}
event := b.msBuilder.CreateNewHistoryEvent(types.EventTypeStartChildWorkflowExecutionInitiated)
event.StartChildWorkflowExecutionInitiatedEventAttributes = &types.StartChildWorkflowExecutionInitiatedEventAttributes{
Domain: domain,
WorkflowID: attributes.WorkflowID,
WorkflowType: attributes.WorkflowType,
TaskList: attributes.TaskList,
Header: attributes.Header,
Input: attributes.Input,
ExecutionStartToCloseTimeoutSeconds: attributes.ExecutionStartToCloseTimeoutSeconds,
TaskStartToCloseTimeoutSeconds: attributes.TaskStartToCloseTimeoutSeconds,
Control: attributes.Control,
DecisionTaskCompletedEventID: decisionCompletedEventID,
WorkflowIDReusePolicy: attributes.WorkflowIDReusePolicy,
RetryPolicy: attributes.RetryPolicy,
CronSchedule: attributes.CronSchedule,
Memo: attributes.Memo,
SearchAttributes: attributes.SearchAttributes,
ParentClosePolicy: attributes.GetParentClosePolicy().Ptr(),
}
return b.addEventToHistory(event)
}
// AddChildWorkflowExecutionStartedEvent adds ChildWorkflowExecutionStarted event to history
func (b *HistoryBuilder) AddChildWorkflowExecutionStartedEvent(
domain string,
execution *types.WorkflowExecution,
workflowType *types.WorkflowType,
initiatedID int64,
header *types.Header,
) *types.HistoryEvent {
event := b.msBuilder.CreateNewHistoryEvent(types.EventTypeChildWorkflowExecutionStarted)
event.ChildWorkflowExecutionStartedEventAttributes = &types.ChildWorkflowExecutionStartedEventAttributes{
Domain: domain,
WorkflowExecution: execution,
WorkflowType: workflowType,
InitiatedEventID: initiatedID,
Header: header,
}
return b.addEventToHistory(event)
}
// AddStartChildWorkflowExecutionFailedEvent adds ChildWorkflowExecutionFailed event to history
func (b *HistoryBuilder) AddStartChildWorkflowExecutionFailedEvent(initiatedID int64,
cause types.ChildWorkflowExecutionFailedCause,
initiatedEventAttributes *types.StartChildWorkflowExecutionInitiatedEventAttributes) *types.HistoryEvent {
event := b.msBuilder.CreateNewHistoryEvent(types.EventTypeStartChildWorkflowExecutionFailed)
event.StartChildWorkflowExecutionFailedEventAttributes = &types.StartChildWorkflowExecutionFailedEventAttributes{
Domain: initiatedEventAttributes.Domain,
WorkflowID: initiatedEventAttributes.WorkflowID,
WorkflowType: initiatedEventAttributes.WorkflowType,
InitiatedEventID: initiatedID,
DecisionTaskCompletedEventID: initiatedEventAttributes.DecisionTaskCompletedEventID,
Control: initiatedEventAttributes.Control,
Cause: cause.Ptr(),
}
return b.addEventToHistory(event)
}
// AddChildWorkflowExecutionCompletedEvent adds ChildWorkflowExecutionCompleted event to history
func (b *HistoryBuilder) AddChildWorkflowExecutionCompletedEvent(domain string, execution *types.WorkflowExecution,
workflowType *types.WorkflowType, initiatedID, startedID int64,
completedAttributes *types.WorkflowExecutionCompletedEventAttributes) *types.HistoryEvent {
event := b.msBuilder.CreateNewHistoryEvent(types.EventTypeChildWorkflowExecutionCompleted)
event.ChildWorkflowExecutionCompletedEventAttributes = &types.ChildWorkflowExecutionCompletedEventAttributes{
Domain: domain,
WorkflowExecution: execution,
WorkflowType: workflowType,
InitiatedEventID: initiatedID,
StartedEventID: startedID,
Result: completedAttributes.Result,
}
return b.addEventToHistory(event)
}
// AddChildWorkflowExecutionFailedEvent adds ChildWorkflowExecutionFailed event to history
func (b *HistoryBuilder) AddChildWorkflowExecutionFailedEvent(domain string, execution *types.WorkflowExecution,
workflowType *types.WorkflowType, initiatedID, startedID int64,
failedAttributes *types.WorkflowExecutionFailedEventAttributes) *types.HistoryEvent {
event := b.msBuilder.CreateNewHistoryEvent(types.EventTypeChildWorkflowExecutionFailed)
event.ChildWorkflowExecutionFailedEventAttributes = &types.ChildWorkflowExecutionFailedEventAttributes{
Domain: domain,
WorkflowExecution: execution,
WorkflowType: workflowType,
InitiatedEventID: initiatedID,
StartedEventID: startedID,
Reason: common.StringPtr(common.StringDefault(failedAttributes.Reason)),
Details: failedAttributes.Details,
}
return b.addEventToHistory(event)
}
// AddChildWorkflowExecutionCanceledEvent adds ChildWorkflowExecutionCanceled event to history
func (b *HistoryBuilder) AddChildWorkflowExecutionCanceledEvent(domain string, execution *types.WorkflowExecution,
workflowType *types.WorkflowType, initiatedID, startedID int64,
canceledAttributes *types.WorkflowExecutionCanceledEventAttributes) *types.HistoryEvent {
event := b.msBuilder.CreateNewHistoryEvent(types.EventTypeChildWorkflowExecutionCanceled)
event.ChildWorkflowExecutionCanceledEventAttributes = &types.ChildWorkflowExecutionCanceledEventAttributes{
Domain: domain,
WorkflowExecution: execution,
WorkflowType: workflowType,
InitiatedEventID: initiatedID,
StartedEventID: startedID,
Details: canceledAttributes.Details,
}
return b.addEventToHistory(event)
}
// AddChildWorkflowExecutionTerminatedEvent adds ChildWorkflowExecutionTerminated event to history
func (b *HistoryBuilder) AddChildWorkflowExecutionTerminatedEvent(domain string, execution *types.WorkflowExecution,
workflowType *types.WorkflowType, initiatedID, startedID int64,
terminatedAttributes *types.WorkflowExecutionTerminatedEventAttributes) *types.HistoryEvent {
event := b.msBuilder.CreateNewHistoryEvent(types.EventTypeChildWorkflowExecutionTerminated)
event.ChildWorkflowExecutionTerminatedEventAttributes = &types.ChildWorkflowExecutionTerminatedEventAttributes{
Domain: domain,
WorkflowExecution: execution,
WorkflowType: workflowType,
InitiatedEventID: initiatedID,
StartedEventID: startedID,
}
return b.addEventToHistory(event)
}
// AddChildWorkflowExecutionTimedOutEvent adds ChildWorkflowExecutionTimedOut event to history
func (b *HistoryBuilder) AddChildWorkflowExecutionTimedOutEvent(domain string, execution *types.WorkflowExecution,
workflowType *types.WorkflowType, initiatedID, startedID int64,
timedOutAttributes *types.WorkflowExecutionTimedOutEventAttributes) *types.HistoryEvent {
event := b.msBuilder.CreateNewHistoryEvent(types.EventTypeChildWorkflowExecutionTimedOut)
event.ChildWorkflowExecutionTimedOutEventAttributes = &types.ChildWorkflowExecutionTimedOutEventAttributes{
Domain: domain,
TimeoutType: timedOutAttributes.TimeoutType,
WorkflowExecution: execution,
WorkflowType: workflowType,
InitiatedEventID: initiatedID,
StartedEventID: startedID,
}
return b.addEventToHistory(event)
}
func (b *HistoryBuilder) addEventToHistory(event *types.HistoryEvent) *types.HistoryEvent {
b.history = append(b.history, event)
return event
}
func (b *HistoryBuilder) addTransientEvent(event *types.HistoryEvent) *types.HistoryEvent {
b.transientHistory = append(b.transientHistory, event)
return event
}
func newDecisionTaskScheduledEventWithInfo(eventID, timestamp int64, taskList string, startToCloseTimeoutSeconds int32, attempt int64) *types.HistoryEvent {
event := createNewHistoryEvent(eventID, types.EventTypeDecisionTaskScheduled, timestamp)
event.DecisionTaskScheduledEventAttributes = getDecisionTaskScheduledEventAttributes(taskList, startToCloseTimeoutSeconds, attempt)
return event
}
func newDecisionTaskStartedEventWithInfo(eventID, timestamp int64, scheduledEventID int64, requestID string, identity string) *types.HistoryEvent {
event := createNewHistoryEvent(eventID, types.EventTypeDecisionTaskStarted, timestamp)
event.DecisionTaskStartedEventAttributes = getDecisionTaskStartedEventAttributes(scheduledEventID, requestID, identity)
return event
}
func createNewHistoryEvent(eventID int64, eventType types.EventType, timestamp int64) *types.HistoryEvent {
return &types.HistoryEvent{
ID: eventID,
Timestamp: common.Int64Ptr(timestamp),
EventType: eventType.Ptr(),
}
}
func getDecisionTaskScheduledEventAttributes(taskList string, startToCloseTimeoutSeconds int32, attempt int64) *types.DecisionTaskScheduledEventAttributes {
return &types.DecisionTaskScheduledEventAttributes{
TaskList: &types.TaskList{
Name: taskList,
},
StartToCloseTimeoutSeconds: common.Int32Ptr(startToCloseTimeoutSeconds),
Attempt: attempt,
}
}
func getDecisionTaskStartedEventAttributes(scheduledEventID int64, requestID string, identity string) *types.DecisionTaskStartedEventAttributes {
return &types.DecisionTaskStartedEventAttributes{
ScheduledEventID: scheduledEventID,
Identity: identity,
RequestID: requestID,
}
}
// GetHistory gets workflow history stored inside history builder
func (b *HistoryBuilder) GetHistory() *types.History {
history := types.History{Events: b.history}
return &history
}