in service/history/decision/checker.go [191:355]
func (v *attrValidator) validateActivityScheduleAttributes(
domainID string,
targetDomainID string,
attributes *types.ScheduleActivityTaskDecisionAttributes,
wfTimeout int32,
metricsScope int,
) error {
if err := v.validateCrossDomainCall(
domainID,
targetDomainID,
); err != nil {
return err
}
if attributes == nil {
return &types.BadRequestError{Message: "ScheduleActivityTaskDecisionAttributes is not set on decision."}
}
defaultTaskListName := ""
if _, err := v.validatedTaskList(attributes.TaskList, defaultTaskListName, metricsScope, attributes.GetDomain()); err != nil {
return err
}
if attributes.GetActivityID() == "" {
return &types.BadRequestError{Message: "ActivityId is not set on decision."}
}
if attributes.ActivityType == nil || attributes.ActivityType.GetName() == "" {
return &types.BadRequestError{Message: "ActivityType is not set on decision."}
}
if err := common.ValidateRetryPolicy(attributes.RetryPolicy); err != nil {
return err
}
idLengthWarnLimit := v.config.MaxIDLengthWarnLimit()
if !common.IsValidIDLength(
attributes.GetActivityID(),
v.metricsClient.Scope(metricsScope),
idLengthWarnLimit,
v.config.ActivityIDMaxLength(attributes.GetDomain()),
metrics.CadenceErrActivityIDExceededWarnLimit,
attributes.GetDomain(),
v.logger,
tag.IDTypeActivityID) {
return &types.BadRequestError{Message: "ActivityID exceeds length limit."}
}
if !common.IsValidIDLength(
attributes.GetActivityType().GetName(),
v.metricsClient.Scope(metricsScope),
idLengthWarnLimit,
v.config.ActivityTypeMaxLength(attributes.GetDomain()),
metrics.CadenceErrActivityTypeExceededWarnLimit,
attributes.GetDomain(),
v.logger,
tag.IDTypeActivityType) {
return &types.BadRequestError{Message: "ActivityType exceeds length limit."}
}
if !common.IsValidIDLength(
attributes.GetDomain(),
v.metricsClient.Scope(metricsScope),
idLengthWarnLimit,
v.config.DomainNameMaxLength(attributes.GetDomain()),
metrics.CadenceErrDomainNameExceededWarnLimit,
attributes.GetDomain(),
v.logger,
tag.IDTypeDomainName) {
return &types.BadRequestError{Message: "Domain exceeds length limit."}
}
// Only attempt to deduce and fill in unspecified timeouts only when all timeouts are non-negative.
if attributes.GetScheduleToCloseTimeoutSeconds() < 0 || attributes.GetScheduleToStartTimeoutSeconds() < 0 ||
attributes.GetStartToCloseTimeoutSeconds() < 0 || attributes.GetHeartbeatTimeoutSeconds() < 0 {
return &types.BadRequestError{Message: "A valid timeout may not be negative."}
}
// ensure activity timeout never larger than workflow timeout
if attributes.GetScheduleToCloseTimeoutSeconds() > wfTimeout {
attributes.ScheduleToCloseTimeoutSeconds = common.Int32Ptr(wfTimeout)
}
if attributes.GetScheduleToStartTimeoutSeconds() > wfTimeout {
attributes.ScheduleToStartTimeoutSeconds = common.Int32Ptr(wfTimeout)
}
if attributes.GetStartToCloseTimeoutSeconds() > wfTimeout {
attributes.StartToCloseTimeoutSeconds = common.Int32Ptr(wfTimeout)
}
if attributes.GetHeartbeatTimeoutSeconds() > wfTimeout {
attributes.HeartbeatTimeoutSeconds = common.Int32Ptr(wfTimeout)
}
validScheduleToClose := attributes.GetScheduleToCloseTimeoutSeconds() > 0
validScheduleToStart := attributes.GetScheduleToStartTimeoutSeconds() > 0
validStartToClose := attributes.GetStartToCloseTimeoutSeconds() > 0
if validScheduleToClose {
if !validScheduleToStart {
attributes.ScheduleToStartTimeoutSeconds = common.Int32Ptr(attributes.GetScheduleToCloseTimeoutSeconds())
}
if !validStartToClose {
attributes.StartToCloseTimeoutSeconds = common.Int32Ptr(attributes.GetScheduleToCloseTimeoutSeconds())
}
} else if validScheduleToStart && validStartToClose {
attributes.ScheduleToCloseTimeoutSeconds = common.Int32Ptr(attributes.GetScheduleToStartTimeoutSeconds() + attributes.GetStartToCloseTimeoutSeconds())
if attributes.GetScheduleToCloseTimeoutSeconds() > wfTimeout {
attributes.ScheduleToCloseTimeoutSeconds = common.Int32Ptr(wfTimeout)
}
} else {
// Deduction failed as there's not enough information to fill in missing timeouts.
return &types.BadRequestError{Message: "A valid ScheduleToCloseTimeout is not set on decision."}
}
// ensure activity's SCHEDULE_TO_START and SCHEDULE_TO_CLOSE is as long as expiration on retry policy
// if SCHEDULE_TO_START timeout is retryable
p := attributes.RetryPolicy
if p != nil {
isScheduleToStartRetryable := true
scheduleToStartErrorReason := execution.TimerTypeToReason(execution.TimerTypeScheduleToStart)
for _, reason := range p.GetNonRetriableErrorReasons() {
if reason == scheduleToStartErrorReason {
isScheduleToStartRetryable = false
break
}
}
expiration := p.GetExpirationIntervalInSeconds()
if expiration == 0 || expiration > wfTimeout {
expiration = wfTimeout
}
if isScheduleToStartRetryable {
// If schedule to start timeout is retryable, we don't need to fail the activity and schedule
// it again on the same tasklist, as it's a no-op). Extending schedule to start timeout to achieve
// the same thing.
//
// Theoretically, we can extend schedule to start to be as long as the expiration time,
// but if user specifies a very long expiration time and activity task got lost after the activity is
// scheduled, workflow will be stuck for a long time. So here, we cap the schedule to start timeout
// to a maximum value, so that when the activity task got lost, timeout can happen sooner and schedule
// the activity again.
domainName, _ := v.domainCache.GetDomainName(domainID) // if this call returns an error, we will just used the default value for max timeout
maximumScheduleToStartTimeoutForRetryInSeconds := int32(v.config.ActivityMaxScheduleToStartTimeoutForRetry(domainName).Seconds())
scheduleToStartExpiration := common.MinInt32(expiration, maximumScheduleToStartTimeoutForRetryInSeconds)
if attributes.GetScheduleToStartTimeoutSeconds() < scheduleToStartExpiration {
attributes.ScheduleToStartTimeoutSeconds = common.Int32Ptr(scheduleToStartExpiration)
}
// TODO: uncomment the following code when the client side bug for calculating scheduleToClose deadline is fixed and
// fully rolled out. Before that, we still need to extend scheduleToClose timeout to be as long as the expiration interval
//
// scheduleToCloseExpiration := common.MinInt32(expiration, scheduleToStartExpiration+attributes.GetStartToCloseTimeoutSeconds())
// if attributes.GetScheduleToCloseTimeoutSeconds() < scheduleToCloseExpiration {
// attributes.ScheduleToCloseTimeoutSeconds = common.Int32Ptr(scheduleToCloseExpiration)
// }
}
if attributes.GetScheduleToCloseTimeoutSeconds() < expiration {
attributes.ScheduleToCloseTimeoutSeconds = common.Int32Ptr(expiration)
}
}
return nil
}