service/history/decision/task_handler.go (910 lines of code) (raw):

// Copyright (c) 2017 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package decision import ( "context" "fmt" "github.com/pborman/uuid" "github.com/uber/cadence/common" "github.com/uber/cadence/common/backoff" "github.com/uber/cadence/common/cache" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/types" "github.com/uber/cadence/service/history/config" "github.com/uber/cadence/service/history/execution" "github.com/uber/cadence/service/history/workflow" ) const ( activityCancellationMsgActivityIDUnknown = "ACTIVITY_ID_UNKNOWN" activityCancellationMsgActivityNotStarted = "ACTIVITY_ID_NOT_STARTED" ) type ( attrValidationFn func() error taskHandlerImpl struct { identity string decisionTaskCompletedID int64 domainEntry *cache.DomainCacheEntry // internal state hasUnhandledEventsBeforeDecisions bool failDecision bool failDecisionCause *types.DecisionTaskFailedCause failMessage *string activityNotStartedCancelled bool continueAsNewBuilder execution.MutableState stopProcessing bool // should stop processing any more decisions mutableState execution.MutableState // validation attrValidator *attrValidator sizeLimitChecker *workflowSizeChecker tokenSerializer common.TaskTokenSerializer logger log.Logger domainCache cache.DomainCache metricsClient metrics.Client config *config.Config activityCountToDispatch int } decisionResult struct { activityDispatchInfo *types.ActivityLocalDispatchInfo } ) func newDecisionTaskHandler( identity string, decisionTaskCompletedID int64, domainEntry *cache.DomainCacheEntry, mutableState execution.MutableState, attrValidator *attrValidator, sizeLimitChecker *workflowSizeChecker, tokenSerializer common.TaskTokenSerializer, logger log.Logger, domainCache cache.DomainCache, metricsClient metrics.Client, config *config.Config, ) *taskHandlerImpl { return &taskHandlerImpl{ identity: identity, decisionTaskCompletedID: decisionTaskCompletedID, domainEntry: domainEntry, // internal state hasUnhandledEventsBeforeDecisions: mutableState.HasBufferedEvents(), failDecision: false, failDecisionCause: nil, failMessage: nil, activityNotStartedCancelled: false, continueAsNewBuilder: nil, stopProcessing: false, mutableState: mutableState, // validation attrValidator: attrValidator, sizeLimitChecker: sizeLimitChecker, tokenSerializer: tokenSerializer, logger: logger, domainCache: domainCache, metricsClient: metricsClient, config: config, activityCountToDispatch: config.MaxActivityCountDispatchByDomain(domainEntry.GetInfo().Name), } } func (handler *taskHandlerImpl) handleDecisions( ctx context.Context, executionContext []byte, decisions []*types.Decision, ) ([]*decisionResult, error) { // overall workflow size / count check failWorkflow, err := handler.sizeLimitChecker.failWorkflowSizeExceedsLimit() if err != nil || failWorkflow { return nil, err } var results []*decisionResult for _, decision := range decisions { result, err := handler.handleDecisionWithResult(ctx, decision) if err != nil || handler.stopProcessing { return nil, err } else if result != nil { results = append(results, result) } } handler.mutableState.GetExecutionInfo().ExecutionContext = executionContext return results, nil } func (handler *taskHandlerImpl) handleDecisionWithResult( ctx context.Context, decision *types.Decision, ) (*decisionResult, error) { switch decision.GetDecisionType() { case types.DecisionTypeScheduleActivityTask: return handler.handleDecisionScheduleActivity(ctx, decision.ScheduleActivityTaskDecisionAttributes) default: return nil, handler.handleDecision(ctx, decision) } } func (handler *taskHandlerImpl) handleDecision( ctx context.Context, decision *types.Decision, ) error { switch decision.GetDecisionType() { case types.DecisionTypeCompleteWorkflowExecution: return handler.handleDecisionCompleteWorkflow(ctx, decision.CompleteWorkflowExecutionDecisionAttributes) case types.DecisionTypeFailWorkflowExecution: return handler.handleDecisionFailWorkflow(ctx, decision.FailWorkflowExecutionDecisionAttributes) case types.DecisionTypeCancelWorkflowExecution: return handler.handleDecisionCancelWorkflow(ctx, decision.CancelWorkflowExecutionDecisionAttributes) case types.DecisionTypeStartTimer: return handler.handleDecisionStartTimer(ctx, decision.StartTimerDecisionAttributes) case types.DecisionTypeRequestCancelActivityTask: return handler.handleDecisionRequestCancelActivity(ctx, decision.RequestCancelActivityTaskDecisionAttributes) case types.DecisionTypeCancelTimer: return handler.handleDecisionCancelTimer(ctx, decision.CancelTimerDecisionAttributes) case types.DecisionTypeRecordMarker: return handler.handleDecisionRecordMarker(ctx, decision.RecordMarkerDecisionAttributes) case types.DecisionTypeRequestCancelExternalWorkflowExecution: return handler.handleDecisionRequestCancelExternalWorkflow(ctx, decision.RequestCancelExternalWorkflowExecutionDecisionAttributes) case types.DecisionTypeSignalExternalWorkflowExecution: return handler.handleDecisionSignalExternalWorkflow(ctx, decision.SignalExternalWorkflowExecutionDecisionAttributes) case types.DecisionTypeContinueAsNewWorkflowExecution: return handler.handleDecisionContinueAsNewWorkflow(ctx, decision.ContinueAsNewWorkflowExecutionDecisionAttributes) case types.DecisionTypeStartChildWorkflowExecution: return handler.handleDecisionStartChildWorkflow(ctx, decision.StartChildWorkflowExecutionDecisionAttributes) case types.DecisionTypeUpsertWorkflowSearchAttributes: return handler.handleDecisionUpsertWorkflowSearchAttributes(ctx, decision.UpsertWorkflowSearchAttributesDecisionAttributes) default: return &types.BadRequestError{Message: fmt.Sprintf("Unknown decision type: %v", decision.GetDecisionType())} } } func (handler *taskHandlerImpl) handleDecisionScheduleActivity( ctx context.Context, attr *types.ScheduleActivityTaskDecisionAttributes, ) (*decisionResult, error) { handler.metricsClient.IncCounter( metrics.HistoryRespondDecisionTaskCompletedScope, metrics.DecisionTypeScheduleActivityCounter, ) executionInfo := handler.mutableState.GetExecutionInfo() domainID := executionInfo.DomainID targetDomainID := domainID if attr.GetDomain() != "" { targetDomainEntry, err := handler.domainCache.GetDomain(attr.GetDomain()) if err != nil { return nil, &types.InternalServiceError{ Message: fmt.Sprintf("Unable to schedule activity across domain %v.", attr.GetDomain()), } } targetDomainID = targetDomainEntry.GetInfo().ID } if err := handler.validateDecisionAttr( func() error { return handler.attrValidator.validateActivityScheduleAttributes( domainID, targetDomainID, attr, executionInfo.WorkflowTimeout, metrics.HistoryRespondDecisionTaskCompletedScope, ) }, types.DecisionTaskFailedCauseBadScheduleActivityAttributes, ); err != nil || handler.stopProcessing { return nil, err } failWorkflow, err := handler.sizeLimitChecker.failWorkflowIfBlobSizeExceedsLimit( metrics.DecisionTypeTag(types.DecisionTypeScheduleActivityTask.String()), attr.Input, "ScheduleActivityTaskDecisionAttributes.Input exceeds size limit.", ) if err != nil || failWorkflow { handler.stopProcessing = true return nil, err } event, ai, activityDispatchInfo, dispatched, started, err := handler.mutableState.AddActivityTaskScheduledEvent( ctx, handler.decisionTaskCompletedID, attr, handler.activityCountToDispatch > 0) if dispatched { handler.activityCountToDispatch-- } switch err.(type) { case nil: if activityDispatchInfo != nil || started { if _, err1 := handler.mutableState.AddActivityTaskStartedEvent(ai, event.ID, uuid.New(), handler.identity); err1 != nil { return nil, err1 } if started { return nil, nil } token := &common.TaskToken{ DomainID: executionInfo.DomainID, WorkflowID: executionInfo.WorkflowID, WorkflowType: executionInfo.WorkflowTypeName, RunID: executionInfo.RunID, ScheduleID: ai.ScheduleID, ScheduleAttempt: 0, ActivityID: ai.ActivityID, ActivityType: attr.ActivityType.GetName(), } activityDispatchInfo.TaskToken, err = handler.tokenSerializer.Serialize(token) if err != nil { return nil, workflow.ErrSerializingToken } activityDispatchInfo.ScheduledTimestamp = common.Int64Ptr(ai.ScheduledTime.UnixNano()) activityDispatchInfo.ScheduledTimestampOfThisAttempt = common.Int64Ptr(ai.ScheduledTime.UnixNano()) activityDispatchInfo.StartedTimestamp = common.Int64Ptr(ai.StartedTime.UnixNano()) return &decisionResult{activityDispatchInfo: activityDispatchInfo}, nil } return nil, nil case *types.BadRequestError: return nil, handler.handlerFailDecision( types.DecisionTaskFailedCauseScheduleActivityDuplicateID, "", ) default: return nil, err } } func (handler *taskHandlerImpl) handleDecisionRequestCancelActivity( ctx context.Context, attr *types.RequestCancelActivityTaskDecisionAttributes, ) error { handler.metricsClient.IncCounter( metrics.HistoryRespondDecisionTaskCompletedScope, metrics.DecisionTypeCancelActivityCounter, ) if err := handler.validateDecisionAttr( func() error { return handler.attrValidator.validateActivityCancelAttributes( attr, metrics.HistoryRespondDecisionTaskCompletedScope, handler.domainEntry.GetInfo().Name) }, types.DecisionTaskFailedCauseBadRequestCancelActivityAttributes, ); err != nil || handler.stopProcessing { return err } activityID := attr.GetActivityID() actCancelReqEvent, ai, err := handler.mutableState.AddActivityTaskCancelRequestedEvent( handler.decisionTaskCompletedID, activityID, handler.identity, ) switch err.(type) { case nil: if ai.StartedID == common.EmptyEventID { // We haven't started the activity yet, we can cancel the activity right away and // schedule a decision task to ensure the workflow makes progress. _, err = handler.mutableState.AddActivityTaskCanceledEvent( ai.ScheduleID, ai.StartedID, actCancelReqEvent.ID, []byte(activityCancellationMsgActivityNotStarted), handler.identity, ) if err != nil { return err } handler.activityNotStartedCancelled = true } return nil case *types.BadRequestError: _, err = handler.mutableState.AddRequestCancelActivityTaskFailedEvent( handler.decisionTaskCompletedID, activityID, activityCancellationMsgActivityIDUnknown, ) return err default: return err } } func (handler *taskHandlerImpl) handleDecisionStartTimer( ctx context.Context, attr *types.StartTimerDecisionAttributes, ) error { handler.metricsClient.IncCounter( metrics.HistoryRespondDecisionTaskCompletedScope, metrics.DecisionTypeStartTimerCounter, ) if err := handler.validateDecisionAttr( func() error { return handler.attrValidator.validateTimerScheduleAttributes( attr, metrics.HistoryRespondDecisionTaskCompletedScope, handler.domainEntry.GetInfo().Name) }, types.DecisionTaskFailedCauseBadStartTimerAttributes, ); err != nil || handler.stopProcessing { return err } _, _, err := handler.mutableState.AddTimerStartedEvent(handler.decisionTaskCompletedID, attr) switch err.(type) { case nil: return nil case *types.BadRequestError: return handler.handlerFailDecision( types.DecisionTaskFailedCauseStartTimerDuplicateID, "", ) default: return err } } func (handler *taskHandlerImpl) handleDecisionCompleteWorkflow( ctx context.Context, attr *types.CompleteWorkflowExecutionDecisionAttributes, ) error { handler.metricsClient.IncCounter( metrics.HistoryRespondDecisionTaskCompletedScope, metrics.DecisionTypeCompleteWorkflowCounter, ) if handler.hasUnhandledEventsBeforeDecisions { return handler.handlerFailDecision( types.DecisionTaskFailedCauseUnhandledDecision, "cannot complete workflow, new pending decisions were scheduled while this decision was processing", ) } if err := handler.validateDecisionAttr( func() error { return handler.attrValidator.validateCompleteWorkflowExecutionAttributes(attr) }, types.DecisionTaskFailedCauseBadCompleteWorkflowExecutionAttributes, ); err != nil || handler.stopProcessing { return err } failWorkflow, err := handler.sizeLimitChecker.failWorkflowIfBlobSizeExceedsLimit( metrics.DecisionTypeTag(types.DecisionTypeCompleteWorkflowExecution.String()), attr.Result, "CompleteWorkflowExecutionDecisionAttributes.Result exceeds size limit.", ) if err != nil || failWorkflow { handler.stopProcessing = true return err } // If the decision has more than one completion event than just pick the first one if !handler.mutableState.IsWorkflowExecutionRunning() { handler.metricsClient.IncCounter( metrics.HistoryRespondDecisionTaskCompletedScope, metrics.MultipleCompletionDecisionsCounter, ) handler.logger.Warn( "Multiple completion decisions", tag.WorkflowDecisionType(int64(types.DecisionTypeCompleteWorkflowExecution)), tag.ErrorTypeMultipleCompletionDecisions, ) return nil } // event ID is not relevant isCanceled, _ := handler.mutableState.IsCancelRequested() // check if this is a cron workflow cronBackoff, err := handler.mutableState.GetCronBackoffDuration(ctx) if err != nil { handler.stopProcessing = true return err } if isCanceled || cronBackoff == backoff.NoBackoff { // canceled or not cron, so complete this workflow execution if _, err := handler.mutableState.AddCompletedWorkflowEvent(handler.decisionTaskCompletedID, attr); err != nil { return &types.InternalServiceError{Message: "Unable to add complete workflow event."} } return nil } // this is a cron workflow startEvent, err := handler.mutableState.GetStartEvent(ctx) if err != nil { return err } startAttributes := startEvent.WorkflowExecutionStartedEventAttributes return handler.retryCronContinueAsNew( ctx, startAttributes, int32(cronBackoff.Seconds()), types.ContinueAsNewInitiatorCronSchedule.Ptr(), nil, nil, attr.Result, ) } func (handler *taskHandlerImpl) handleDecisionFailWorkflow( ctx context.Context, attr *types.FailWorkflowExecutionDecisionAttributes, ) error { handler.metricsClient.IncCounter( metrics.HistoryRespondDecisionTaskCompletedScope, metrics.DecisionTypeFailWorkflowCounter, ) if handler.hasUnhandledEventsBeforeDecisions { return handler.handlerFailDecision( types.DecisionTaskFailedCauseUnhandledDecision, "cannot complete workflow, new pending decisions were scheduled while this decision was processing", ) } if err := handler.validateDecisionAttr( func() error { return handler.attrValidator.validateFailWorkflowExecutionAttributes(attr) }, types.DecisionTaskFailedCauseBadFailWorkflowExecutionAttributes, ); err != nil || handler.stopProcessing { return err } failWorkflow, err := handler.sizeLimitChecker.failWorkflowIfBlobSizeExceedsLimit( metrics.DecisionTypeTag(types.DecisionTypeFailWorkflowExecution.String()), attr.Details, "FailWorkflowExecutionDecisionAttributes.Details exceeds size limit.", ) if err != nil || failWorkflow { handler.stopProcessing = true return err } // If the decision has more than one completion event than just pick the first one if !handler.mutableState.IsWorkflowExecutionRunning() { handler.metricsClient.IncCounter( metrics.HistoryRespondDecisionTaskCompletedScope, metrics.MultipleCompletionDecisionsCounter, ) handler.logger.Warn( "Multiple completion decisions", tag.WorkflowDecisionType(int64(types.DecisionTypeFailWorkflowExecution)), tag.ErrorTypeMultipleCompletionDecisions, ) return nil } if is, _ := handler.mutableState.IsCancelRequested(); is { // cancellation must be sticky, as it's telling things to stop. // this is particularly important for child workflows, as if they restart themselves after the parent // cancels its context, there is no way for the parent to cancel the new run. cancelAttrs := types.CancelWorkflowExecutionDecisionAttributes{ // TODO: serialize reason somehow, may deserve a new field / wrapped errors Details: attr.Details, } if _, err := handler.mutableState.AddWorkflowExecutionCanceledEvent(handler.decisionTaskCompletedID, &cancelAttrs); err != nil { return err } return nil } // below will check whether to do continue as new based on backoff & backoff or cron backoffInterval := handler.mutableState.GetRetryBackoffDuration(attr.GetReason()) continueAsNewInitiator := types.ContinueAsNewInitiatorRetryPolicy // first check the backoff retry if backoffInterval == backoff.NoBackoff { // if no backoff retry, set the backoffInterval using cron schedule backoffInterval, err = handler.mutableState.GetCronBackoffDuration(ctx) if err != nil { handler.stopProcessing = true return err } continueAsNewInitiator = types.ContinueAsNewInitiatorCronSchedule } // second check the backoff / cron schedule if backoffInterval == backoff.NoBackoff { // no retry or cron if _, err := handler.mutableState.AddFailWorkflowEvent(handler.decisionTaskCompletedID, attr); err != nil { return err } return nil } // this is a cron / backoff workflow startEvent, err := handler.mutableState.GetStartEvent(ctx) if err != nil { return err } startAttributes := startEvent.WorkflowExecutionStartedEventAttributes return handler.retryCronContinueAsNew( ctx, startAttributes, int32(backoffInterval.Seconds()), continueAsNewInitiator.Ptr(), attr.Reason, attr.Details, startAttributes.LastCompletionResult, ) } func (handler *taskHandlerImpl) handleDecisionCancelTimer( ctx context.Context, attr *types.CancelTimerDecisionAttributes, ) error { handler.metricsClient.IncCounter( metrics.HistoryRespondDecisionTaskCompletedScope, metrics.DecisionTypeCancelTimerCounter, ) if err := handler.validateDecisionAttr( func() error { return handler.attrValidator.validateTimerCancelAttributes( attr, metrics.HistoryRespondDecisionTaskCompletedScope, handler.domainEntry.GetInfo().Name) }, types.DecisionTaskFailedCauseBadCancelTimerAttributes, ); err != nil || handler.stopProcessing { return err } _, err := handler.mutableState.AddTimerCanceledEvent( handler.decisionTaskCompletedID, attr, handler.identity) switch err.(type) { case nil: // timer deletion is a success, we may have deleted a fired timer in // which case we should reset hasBufferedEvents // TODO deletion of timer fired event refreshing hasUnhandledEventsBeforeDecisions // is not entirely correct, since during these decisions processing, new event may appear handler.hasUnhandledEventsBeforeDecisions = handler.mutableState.HasBufferedEvents() return nil case *types.BadRequestError: _, err = handler.mutableState.AddCancelTimerFailedEvent( handler.decisionTaskCompletedID, attr, handler.identity, ) return err default: return err } } func (handler *taskHandlerImpl) handleDecisionCancelWorkflow( ctx context.Context, attr *types.CancelWorkflowExecutionDecisionAttributes, ) error { handler.metricsClient.IncCounter(metrics.HistoryRespondDecisionTaskCompletedScope, metrics.DecisionTypeCancelWorkflowCounter) if handler.hasUnhandledEventsBeforeDecisions { return handler.handlerFailDecision( types.DecisionTaskFailedCauseUnhandledDecision, "cannot process cancellation, new pending decisions were scheduled while this decision was processing", ) } if err := handler.validateDecisionAttr( func() error { return handler.attrValidator.validateCancelWorkflowExecutionAttributes(attr) }, types.DecisionTaskFailedCauseBadCancelWorkflowExecutionAttributes, ); err != nil || handler.stopProcessing { return err } // If the decision has more than one completion event than just pick the first one if !handler.mutableState.IsWorkflowExecutionRunning() { handler.metricsClient.IncCounter( metrics.HistoryRespondDecisionTaskCompletedScope, metrics.MultipleCompletionDecisionsCounter, ) handler.logger.Warn( "Multiple completion decisions", tag.WorkflowDecisionType(int64(types.DecisionTypeCancelWorkflowExecution)), tag.ErrorTypeMultipleCompletionDecisions, ) return nil } _, err := handler.mutableState.AddWorkflowExecutionCanceledEvent(handler.decisionTaskCompletedID, attr) return err } func (handler *taskHandlerImpl) handleDecisionRequestCancelExternalWorkflow( ctx context.Context, attr *types.RequestCancelExternalWorkflowExecutionDecisionAttributes, ) error { handler.metricsClient.IncCounter( metrics.HistoryRespondDecisionTaskCompletedScope, metrics.DecisionTypeCancelExternalWorkflowCounter, ) executionInfo := handler.mutableState.GetExecutionInfo() domainID := executionInfo.DomainID targetDomainID := domainID if attr.GetDomain() != "" { targetDomainEntry, err := handler.domainCache.GetDomain(attr.GetDomain()) if err != nil { return &types.InternalServiceError{ Message: fmt.Sprintf("Unable to cancel workflow across domain: %v.", attr.GetDomain()), } } targetDomainID = targetDomainEntry.GetInfo().ID } if err := handler.validateDecisionAttr( func() error { return handler.attrValidator.validateCancelExternalWorkflowExecutionAttributes( domainID, targetDomainID, attr, metrics.HistoryRespondDecisionTaskCompletedScope, ) }, types.DecisionTaskFailedCauseBadRequestCancelExternalWorkflowExecutionAttributes, ); err != nil || handler.stopProcessing { return err } cancelRequestID := uuid.New() _, _, err := handler.mutableState.AddRequestCancelExternalWorkflowExecutionInitiatedEvent( handler.decisionTaskCompletedID, cancelRequestID, attr, ) return err } func (handler *taskHandlerImpl) handleDecisionRecordMarker( ctx context.Context, attr *types.RecordMarkerDecisionAttributes, ) error { handler.metricsClient.IncCounter( metrics.HistoryRespondDecisionTaskCompletedScope, metrics.DecisionTypeRecordMarkerCounter, ) if err := handler.validateDecisionAttr( func() error { return handler.attrValidator.validateRecordMarkerAttributes( attr, metrics.HistoryRespondDecisionTaskCompletedScope, handler.domainEntry.GetInfo().Name) }, types.DecisionTaskFailedCauseBadRecordMarkerAttributes, ); err != nil || handler.stopProcessing { return err } failWorkflow, err := handler.sizeLimitChecker.failWorkflowIfBlobSizeExceedsLimit( metrics.DecisionTypeTag(types.DecisionTypeRecordMarker.String()), attr.Details, "RecordMarkerDecisionAttributes.Details exceeds size limit.", ) if err != nil || failWorkflow { handler.stopProcessing = true return err } _, err = handler.mutableState.AddRecordMarkerEvent(handler.decisionTaskCompletedID, attr) return err } func (handler *taskHandlerImpl) handleDecisionContinueAsNewWorkflow( ctx context.Context, attr *types.ContinueAsNewWorkflowExecutionDecisionAttributes, ) error { handler.metricsClient.IncCounter( metrics.HistoryRespondDecisionTaskCompletedScope, metrics.DecisionTypeContinueAsNewCounter, ) if handler.hasUnhandledEventsBeforeDecisions { return handler.handlerFailDecision( types.DecisionTaskFailedCauseUnhandledDecision, "cannot complete workflow, new pending decisions were scheduled while this decision was processing", ) } executionInfo := handler.mutableState.GetExecutionInfo() if err := handler.validateDecisionAttr( func() error { return handler.attrValidator.validateContinueAsNewWorkflowExecutionAttributes( attr, executionInfo, metrics.HistoryRespondDecisionTaskCompletedScope, handler.domainEntry.GetInfo().Name, ) }, types.DecisionTaskFailedCauseBadContinueAsNewAttributes, ); err != nil || handler.stopProcessing { return err } failWorkflow, err := handler.sizeLimitChecker.failWorkflowIfBlobSizeExceedsLimit( metrics.DecisionTypeTag(types.DecisionTypeContinueAsNewWorkflowExecution.String()), attr.Input, "ContinueAsNewWorkflowExecutionDecisionAttributes. Input exceeds size limit.", ) if err != nil || failWorkflow { handler.stopProcessing = true return err } // If the decision has more than one completion event than just pick the first one if !handler.mutableState.IsWorkflowExecutionRunning() { handler.metricsClient.IncCounter( metrics.HistoryRespondDecisionTaskCompletedScope, metrics.MultipleCompletionDecisionsCounter, ) handler.logger.Warn( "Multiple completion decisions", tag.WorkflowDecisionType(int64(types.DecisionTypeContinueAsNewWorkflowExecution)), tag.ErrorTypeMultipleCompletionDecisions, ) return nil } if is, _ := handler.mutableState.IsCancelRequested(); is { // cancellation must be sticky, as it's telling things to stop. // this is particularly important for child workflows, as if they restart themselves after the parent // cancels its context, there is no way for the parent to cancel the new run. cancelAttrs := types.CancelWorkflowExecutionDecisionAttributes{ Details: nil, // TODO: serialize continue-as-new data somehow, may deserve a new field } if _, err := handler.mutableState.AddWorkflowExecutionCanceledEvent(handler.decisionTaskCompletedID, &cancelAttrs); err != nil { return err } return nil } // Extract parentDomainName so it can be passed down to next run of workflow execution var parentDomainName string if handler.mutableState.HasParentExecution() { parentDomainID := executionInfo.ParentDomainID parentDomainName, err = handler.domainCache.GetDomainName(parentDomainID) if err != nil { return err } } _, newStateBuilder, err := handler.mutableState.AddContinueAsNewEvent( ctx, handler.decisionTaskCompletedID, handler.decisionTaskCompletedID, parentDomainName, attr, ) if err != nil { return err } handler.continueAsNewBuilder = newStateBuilder return nil } func (handler *taskHandlerImpl) handleDecisionStartChildWorkflow( ctx context.Context, attr *types.StartChildWorkflowExecutionDecisionAttributes, ) error { handler.metricsClient.IncCounter( metrics.HistoryRespondDecisionTaskCompletedScope, metrics.DecisionTypeChildWorkflowCounter, ) executionInfo := handler.mutableState.GetExecutionInfo() domainID := executionInfo.DomainID targetDomainID := domainID if attr.GetDomain() != "" { targetDomainEntry, err := handler.domainCache.GetDomain(attr.GetDomain()) if err != nil { return &types.InternalServiceError{ Message: fmt.Sprintf("Unable to schedule child execution across domain %v.", attr.GetDomain()), } } targetDomainID = targetDomainEntry.GetInfo().ID } if err := handler.validateDecisionAttr( func() error { return handler.attrValidator.validateStartChildExecutionAttributes( domainID, targetDomainID, attr, executionInfo, metrics.HistoryRespondDecisionTaskCompletedScope, ) }, types.DecisionTaskFailedCauseBadStartChildExecutionAttributes, ); err != nil || handler.stopProcessing { return err } failWorkflow, err := handler.sizeLimitChecker.failWorkflowIfBlobSizeExceedsLimit( metrics.DecisionTypeTag(types.DecisionTypeStartChildWorkflowExecution.String()), attr.Input, "StartChildWorkflowExecutionDecisionAttributes.Input exceeds size limit.", ) if err != nil || failWorkflow { handler.stopProcessing = true return err } enabled := handler.config.EnableParentClosePolicy(handler.domainEntry.GetInfo().Name) if attr.ParentClosePolicy == nil { // for old clients, this field is empty. If they enable the feature, make default as terminate if enabled { attr.ParentClosePolicy = types.ParentClosePolicyTerminate.Ptr() } else { attr.ParentClosePolicy = types.ParentClosePolicyAbandon.Ptr() } } else { // for domains that haven't enabled the feature yet, need to use Abandon for backward-compatibility if !enabled { attr.ParentClosePolicy = types.ParentClosePolicyAbandon.Ptr() } } requestID := uuid.New() _, _, err = handler.mutableState.AddStartChildWorkflowExecutionInitiatedEvent( handler.decisionTaskCompletedID, requestID, attr, ) return err } func (handler *taskHandlerImpl) handleDecisionSignalExternalWorkflow( ctx context.Context, attr *types.SignalExternalWorkflowExecutionDecisionAttributes, ) error { handler.metricsClient.IncCounter( metrics.HistoryRespondDecisionTaskCompletedScope, metrics.DecisionTypeSignalExternalWorkflowCounter, ) executionInfo := handler.mutableState.GetExecutionInfo() domainID := executionInfo.DomainID targetDomainID := domainID if attr.GetDomain() != "" { targetDomainEntry, err := handler.domainCache.GetDomain(attr.GetDomain()) if err != nil { return &types.InternalServiceError{ Message: fmt.Sprintf("Unable to signal workflow across domain: %v.", attr.GetDomain()), } } targetDomainID = targetDomainEntry.GetInfo().ID } if err := handler.validateDecisionAttr( func() error { return handler.attrValidator.validateSignalExternalWorkflowExecutionAttributes( domainID, targetDomainID, attr, metrics.HistoryRespondDecisionTaskCompletedScope, ) }, types.DecisionTaskFailedCauseBadSignalWorkflowExecutionAttributes, ); err != nil || handler.stopProcessing { return err } failWorkflow, err := handler.sizeLimitChecker.failWorkflowIfBlobSizeExceedsLimit( metrics.DecisionTypeTag(types.DecisionTypeSignalExternalWorkflowExecution.String()), attr.Input, "SignalExternalWorkflowExecutionDecisionAttributes.Input exceeds size limit.", ) if err != nil || failWorkflow { handler.stopProcessing = true return err } signalRequestID := uuid.New() // for deduplicate _, _, err = handler.mutableState.AddSignalExternalWorkflowExecutionInitiatedEvent( handler.decisionTaskCompletedID, signalRequestID, attr, ) return err } func (handler *taskHandlerImpl) handleDecisionUpsertWorkflowSearchAttributes( ctx context.Context, attr *types.UpsertWorkflowSearchAttributesDecisionAttributes, ) error { handler.metricsClient.IncCounter( metrics.HistoryRespondDecisionTaskCompletedScope, metrics.DecisionTypeUpsertWorkflowSearchAttributesCounter, ) // get domain name executionInfo := handler.mutableState.GetExecutionInfo() domainID := executionInfo.DomainID domainName, err := handler.domainCache.GetDomainName(domainID) if err != nil { return &types.InternalServiceError{ Message: fmt.Sprintf("Unable to get domain for domainID: %v.", domainID), } } // valid search attributes for upsert if err := handler.validateDecisionAttr( func() error { return handler.attrValidator.validateUpsertWorkflowSearchAttributes( domainName, attr, ) }, types.DecisionTaskFailedCauseBadSearchAttributes, ); err != nil || handler.stopProcessing { return err } // blob size limit check failWorkflow, err := handler.sizeLimitChecker.failWorkflowIfBlobSizeExceedsLimit( metrics.DecisionTypeTag(types.DecisionTypeUpsertWorkflowSearchAttributes.String()), convertSearchAttributesToByteArray(attr.GetSearchAttributes().GetIndexedFields()), "UpsertWorkflowSearchAttributesDecisionAttributes exceeds size limit.", ) if err != nil || failWorkflow { handler.stopProcessing = true return err } _, err = handler.mutableState.AddUpsertWorkflowSearchAttributesEvent( handler.decisionTaskCompletedID, attr, ) return err } func convertSearchAttributesToByteArray(fields map[string][]byte) []byte { result := make([]byte, 0) for k, v := range fields { result = append(result, []byte(k)...) result = append(result, v...) } return result } func (handler *taskHandlerImpl) retryCronContinueAsNew( ctx context.Context, attr *types.WorkflowExecutionStartedEventAttributes, backoffInterval int32, continueAsNewIter *types.ContinueAsNewInitiator, failureReason *string, failureDetails []byte, lastCompletionResult []byte, ) error { continueAsNewAttributes := &types.ContinueAsNewWorkflowExecutionDecisionAttributes{ WorkflowType: attr.WorkflowType, TaskList: attr.TaskList, RetryPolicy: attr.RetryPolicy, Input: attr.Input, ExecutionStartToCloseTimeoutSeconds: attr.ExecutionStartToCloseTimeoutSeconds, TaskStartToCloseTimeoutSeconds: attr.TaskStartToCloseTimeoutSeconds, CronSchedule: attr.CronSchedule, BackoffStartIntervalInSeconds: common.Int32Ptr(backoffInterval), Initiator: continueAsNewIter, FailureReason: failureReason, FailureDetails: failureDetails, LastCompletionResult: lastCompletionResult, Header: attr.Header, Memo: attr.Memo, SearchAttributes: attr.SearchAttributes, JitterStartSeconds: attr.JitterStartSeconds, } _, newStateBuilder, err := handler.mutableState.AddContinueAsNewEvent( ctx, handler.decisionTaskCompletedID, handler.decisionTaskCompletedID, attr.GetParentWorkflowDomain(), continueAsNewAttributes, ) if err != nil { return err } handler.continueAsNewBuilder = newStateBuilder return nil } func (handler *taskHandlerImpl) validateDecisionAttr( validationFn attrValidationFn, failedCause types.DecisionTaskFailedCause, ) error { if err := validationFn(); err != nil { if _, ok := err.(*types.BadRequestError); ok { return handler.handlerFailDecision(failedCause, err.Error()) } return err } return nil } func (handler *taskHandlerImpl) handlerFailDecision( failedCause types.DecisionTaskFailedCause, failMessage string, ) error { handler.failDecision = true handler.failDecisionCause = failedCause.Ptr() handler.failMessage = common.StringPtr(failMessage) handler.stopProcessing = true return nil }