service/frontend/api/handler.go (3,482 lines of code) (raw):
// Copyright (c) 2017-2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package api
import (
"context"
"encoding/json"
"fmt"
"sync/atomic"
"time"
"github.com/google/uuid"
"go.uber.org/yarpc"
"golang.org/x/sync/errgroup"
"github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/.gen/go/sqlblobs"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/archiver"
"github.com/uber/cadence/common/backoff"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/client"
"github.com/uber/cadence/common/domain"
"github.com/uber/cadence/common/elasticsearch/validator"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/partition"
"github.com/uber/cadence/common/persistence"
persistenceutils "github.com/uber/cadence/common/persistence/persistence-utils"
"github.com/uber/cadence/common/resource"
"github.com/uber/cadence/common/service"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/service/frontend/config"
"github.com/uber/cadence/service/frontend/validate"
)
const (
// HealthStatusOK is used when this node is healthy and rpc requests are allowed
HealthStatusOK HealthStatus = iota + 1
// HealthStatusWarmingUp is used when the rpc handler is warming up
HealthStatusWarmingUp
// HealthStatusShuttingDown is used when the rpc handler is shutting down
HealthStatusShuttingDown
)
var _ Handler = (*WorkflowHandler)(nil)
type (
// WorkflowHandler - Thrift handler interface for workflow service
WorkflowHandler struct {
resource.Resource
shuttingDown int32
healthStatus int32
tokenSerializer common.TaskTokenSerializer
config *config.Config
versionChecker client.VersionChecker
domainHandler domain.Handler
visibilityQueryValidator *validator.VisibilityQueryValidator
searchAttributesValidator *validator.SearchAttributesValidator
throttleRetry *backoff.ThrottleRetry
producerManager ProducerManager
}
getHistoryContinuationToken struct {
RunID string
FirstEventID int64
NextEventID int64
IsWorkflowRunning bool
PersistenceToken []byte
TransientDecision *types.TransientDecisionInfo
BranchToken []byte
}
domainGetter interface {
GetDomain() string
}
// HealthStatus is an enum that refers to the rpc handler health status
HealthStatus int32
)
var (
frontendServiceRetryPolicy = common.CreateFrontendServiceRetryPolicy()
)
// NewWorkflowHandler creates a thrift handler for the cadence service
func NewWorkflowHandler(
resource resource.Resource,
config *config.Config,
versionChecker client.VersionChecker,
domainHandler domain.Handler,
) *WorkflowHandler {
return &WorkflowHandler{
Resource: resource,
config: config,
healthStatus: int32(HealthStatusWarmingUp),
tokenSerializer: common.NewJSONTaskTokenSerializer(),
versionChecker: versionChecker,
domainHandler: domainHandler,
visibilityQueryValidator: validator.NewQueryValidator(
config.ValidSearchAttributes,
config.EnableQueryAttributeValidation,
),
searchAttributesValidator: validator.NewSearchAttributesValidator(
resource.GetLogger(),
config.EnableQueryAttributeValidation,
config.ValidSearchAttributes,
config.SearchAttributesNumberOfKeysLimit,
config.SearchAttributesSizeOfValueLimit,
config.SearchAttributesTotalSizeLimit,
),
throttleRetry: backoff.NewThrottleRetry(
backoff.WithRetryPolicy(frontendServiceRetryPolicy),
backoff.WithRetryableError(common.IsServiceTransientError),
),
producerManager: NewProducerManager(
resource.GetDomainCache(),
resource.GetAsyncWorkflowQueueProvider(),
resource.GetLogger(),
resource.GetMetricsClient(),
),
}
}
// Start starts the handler
func (wh *WorkflowHandler) Start() {
// TODO: Get warmup duration from config. Even better, run proactive checks such as probing downstream connections.
const warmUpDuration = 30 * time.Second
warmupTimer := time.NewTimer(warmUpDuration)
go func() {
<-warmupTimer.C
wh.GetLogger().Warn("Service warmup duration has elapsed.")
if atomic.CompareAndSwapInt32(&wh.healthStatus, int32(HealthStatusWarmingUp), int32(HealthStatusOK)) {
wh.GetLogger().Warn("Warmup time has elapsed. Service is healthy.")
} else {
status := HealthStatus(atomic.LoadInt32(&wh.healthStatus))
wh.GetLogger().Warn(fmt.Sprintf("Warmup time has elapsed. Service status is: %v", status.String()))
}
}()
}
// Stop stops the handler
func (wh *WorkflowHandler) Stop() {
atomic.StoreInt32(&wh.shuttingDown, 1)
}
// UpdateHealthStatus sets the health status for this rpc handler.
// This health status will be used within the rpc health check handler
func (wh *WorkflowHandler) UpdateHealthStatus(status HealthStatus) {
atomic.StoreInt32(&wh.healthStatus, int32(status))
}
func (wh *WorkflowHandler) isShuttingDown() bool {
return atomic.LoadInt32(&wh.shuttingDown) != 0
}
// Health is for health check
func (wh *WorkflowHandler) Health(ctx context.Context) (*types.HealthStatus, error) {
status := HealthStatus(atomic.LoadInt32(&wh.healthStatus))
msg := status.String()
if status != HealthStatusOK {
wh.GetLogger().Warn(fmt.Sprintf("Service status is: %v", msg))
}
return &types.HealthStatus{
Ok: status == HealthStatusOK,
Msg: msg,
}, nil
}
// RegisterDomain creates a new domain which can be used as a container for all resources. Domain is a top level
// entity within Cadence, used as a container for all resources like workflow executions, tasklists, etc. Domain
// acts as a sandbox and provides isolation for all resources within the domain. All resources belongs to exactly one
// domain.
func (wh *WorkflowHandler) RegisterDomain(ctx context.Context, registerRequest *types.RegisterDomainRequest) (retError error) {
if wh.isShuttingDown() {
return validate.ErrShuttingDown
}
if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
return err
}
if registerRequest == nil {
return validate.ErrRequestNotSet
}
if registerRequest.GetWorkflowExecutionRetentionPeriodInDays() > int32(wh.config.DomainConfig.MaxRetentionDays()) {
return validate.ErrInvalidRetention
}
if err := validate.CheckPermission(wh.config, registerRequest.SecurityToken); err != nil {
return err
}
if err := checkRequiredDomainDataKVs(wh.config.DomainConfig.RequiredDomainDataKeys(), registerRequest.GetData()); err != nil {
return err
}
if registerRequest.GetName() == "" {
return validate.ErrDomainNotSet
}
return wh.domainHandler.RegisterDomain(ctx, registerRequest)
}
// ListDomains returns the information and configuration for a registered domain.
func (wh *WorkflowHandler) ListDomains(
ctx context.Context,
listRequest *types.ListDomainsRequest,
) (response *types.ListDomainsResponse, retError error) {
if wh.isShuttingDown() {
return nil, validate.ErrShuttingDown
}
if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
return nil, err
}
if listRequest == nil {
return nil, validate.ErrRequestNotSet
}
return wh.domainHandler.ListDomains(ctx, listRequest)
}
// DescribeDomain returns the information and configuration for a registered domain.
func (wh *WorkflowHandler) DescribeDomain(
ctx context.Context,
describeRequest *types.DescribeDomainRequest,
) (response *types.DescribeDomainResponse, retError error) {
if wh.isShuttingDown() {
return nil, validate.ErrShuttingDown
}
if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
return nil, err
}
if describeRequest == nil {
return nil, validate.ErrRequestNotSet
}
if describeRequest.GetName() == "" && describeRequest.GetUUID() == "" {
return nil, validate.ErrDomainNotSet
}
resp, err := wh.domainHandler.DescribeDomain(ctx, describeRequest)
if err != nil {
return nil, err
}
if resp.GetFailoverInfo() != nil && resp.GetFailoverInfo().GetFailoverExpireTimestamp() > 0 {
// fetch ongoing failover info from history service
failoverResp, err := wh.GetHistoryClient().GetFailoverInfo(ctx, &types.GetFailoverInfoRequest{
DomainID: resp.GetDomainInfo().UUID,
})
if err != nil {
// despite the error from history, return describe domain response
wh.GetLogger().Error(
fmt.Sprintf("Failed to get failover info for domain %s", resp.DomainInfo.GetName()),
tag.Error(err),
)
return resp, nil
}
resp.FailoverInfo.CompletedShardCount = failoverResp.GetCompletedShardCount()
resp.FailoverInfo.PendingShards = failoverResp.GetPendingShards()
}
return resp, nil
}
// UpdateDomain is used to update the information and configuration for a registered domain.
func (wh *WorkflowHandler) UpdateDomain(
ctx context.Context,
updateRequest *types.UpdateDomainRequest,
) (resp *types.UpdateDomainResponse, retError error) {
domainName := ""
if updateRequest != nil {
domainName = updateRequest.GetName()
}
logger := wh.GetLogger().WithTags(
tag.WorkflowDomainName(domainName),
tag.OperationName("DomainUpdate"))
if updateRequest == nil {
logger.Error("Nil domain update request.",
tag.Error(validate.ErrRequestNotSet))
return nil, validate.ErrRequestNotSet
}
isFailover := isFailoverRequest(updateRequest)
isGraceFailover := isGraceFailoverRequest(updateRequest)
logger.Info(fmt.Sprintf(
"Domain Update requested. isFailover: %v, isGraceFailover: %v, Request: %#v.",
isFailover,
isGraceFailover,
updateRequest))
if wh.isShuttingDown() {
logger.Error("Won't apply the domain update since workflowHandler is shutting down.",
tag.Error(validate.ErrShuttingDown))
return nil, validate.ErrShuttingDown
}
if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
logger.Error("Won't apply the domain update since client version is not supported.",
tag.Error(err))
return nil, err
}
// don't require permission for failover request
if isFailover {
// reject the failover if the cluster is in lockdown
if err := checkFailOverPermission(wh.config, updateRequest.GetName()); err != nil {
logger.Error("Domain failover request rejected since domain is in lockdown.",
tag.Error(err))
return nil, err
}
} else {
if err := validate.CheckPermission(wh.config, updateRequest.SecurityToken); err != nil {
logger.Error("Domain update request rejected due to failing permissions.",
tag.Error(err))
return nil, err
}
}
if isGraceFailover {
if err := wh.checkOngoingFailover(
ctx,
&updateRequest.Name,
); err != nil {
logger.Error("Graceful domain failover request failed. Not able to check ongoing failovers.",
tag.Error(err))
return nil, err
}
}
if updateRequest.GetName() == "" {
logger.Error("Domain not set on request.",
tag.Error(validate.ErrDomainNotSet))
return nil, validate.ErrDomainNotSet
}
// TODO: call remote clusters to verify domain data
resp, err := wh.domainHandler.UpdateDomain(ctx, updateRequest)
if err != nil {
logger.Error("Domain update operation failed.",
tag.Error(err))
return nil, err
}
logger.Info("Domain update operation succeeded.")
return resp, nil
}
// DeprecateDomain us used to update status of a registered domain to DEPRECATED. Once the domain is deprecated
// it cannot be used to start new workflow executions. Existing workflow executions will continue to run on
// deprecated domains.
func (wh *WorkflowHandler) DeprecateDomain(ctx context.Context, deprecateRequest *types.DeprecateDomainRequest) (retError error) {
if wh.isShuttingDown() {
return validate.ErrShuttingDown
}
if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
return err
}
if deprecateRequest == nil {
return validate.ErrRequestNotSet
}
if err := validate.CheckPermission(wh.config, deprecateRequest.SecurityToken); err != nil {
return err
}
if deprecateRequest.GetName() == "" {
return validate.ErrDomainNotSet
}
return wh.domainHandler.DeprecateDomain(ctx, deprecateRequest)
}
// PollForActivityTask - Poll for an activity task.
func (wh *WorkflowHandler) PollForActivityTask(
ctx context.Context,
pollRequest *types.PollForActivityTaskRequest,
) (resp *types.PollForActivityTaskResponse, retError error) {
callTime := time.Now()
if wh.isShuttingDown() {
return nil, validate.ErrShuttingDown
}
if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
return nil, err
}
if pollRequest == nil {
return nil, validate.ErrRequestNotSet
}
domainName := pollRequest.GetDomain()
if domainName == "" {
return nil, validate.ErrDomainNotSet
}
scope := getMetricsScopeWithDomain(metrics.FrontendPollForActivityTaskScope, pollRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
wh.GetLogger().Debug("Received PollForActivityTask")
if err := common.ValidateLongPollContextTimeout(
ctx,
"PollForActivityTask",
wh.GetThrottledLogger().WithTags(tag.WorkflowDomainName(domainName), tag.WorkflowTaskListName(pollRequest.GetTaskList().GetName())),
); err != nil {
return nil, err
}
idLengthWarnLimit := wh.config.MaxIDLengthWarnLimit()
if !common.IsValidIDLength(
domainName,
scope,
idLengthWarnLimit,
wh.config.DomainNameMaxLength(domainName),
metrics.CadenceErrDomainNameExceededWarnLimit,
domainName,
wh.GetLogger(),
tag.IDTypeDomainName) {
return nil, validate.ErrDomainTooLong
}
if err := wh.validateTaskList(pollRequest.TaskList, scope, domainName); err != nil {
return nil, err
}
if !common.IsValidIDLength(
pollRequest.GetIdentity(),
scope,
idLengthWarnLimit,
wh.config.IdentityMaxLength(domainName),
metrics.CadenceErrIdentityExceededWarnLimit,
domainName,
wh.GetLogger(),
tag.IDTypeIdentity) {
return nil, validate.ErrIdentityTooLong
}
domainID, err := wh.GetDomainCache().GetDomainID(domainName)
if err != nil {
return nil, err
}
isolationGroup := wh.getIsolationGroup(ctx, domainName)
if !wh.waitUntilIsolationGroupHealthy(ctx, domainName, isolationGroup) {
return &types.PollForActivityTaskResponse{}, nil
}
// it is possible that we wait for a very long time and the remaining time is not long enough for a long poll
// in this case, return an empty response
if err := common.ValidateLongPollContextTimeout(
ctx,
"PollForActivityTask",
wh.GetThrottledLogger().WithTags(tag.WorkflowDomainName(domainName), tag.WorkflowTaskListName(pollRequest.GetTaskList().GetName())),
); err != nil {
return &types.PollForActivityTaskResponse{}, nil
}
pollerID := uuid.New().String()
op := func() error {
resp, err = wh.GetMatchingClient().PollForActivityTask(ctx, &types.MatchingPollForActivityTaskRequest{
DomainUUID: domainID,
PollerID: pollerID,
PollRequest: pollRequest,
IsolationGroup: isolationGroup,
})
return err
}
err = wh.throttleRetry.Do(ctx, op)
if err != nil {
err = wh.cancelOutstandingPoll(ctx, err, domainID, persistence.TaskListTypeActivity, pollRequest.TaskList, pollerID)
if err != nil {
// For all other errors log an error and return it back to client.
ctxTimeout := "not-set"
ctxDeadline, ok := ctx.Deadline()
if ok {
ctxTimeout = ctxDeadline.Sub(callTime).String()
}
wh.GetLogger().Error("PollForActivityTask failed.",
tag.WorkflowTaskListName(pollRequest.GetTaskList().GetName()),
tag.Value(ctxTimeout),
tag.Error(err))
return nil, err
}
}
return resp, nil
}
// PollForDecisionTask - Poll for a decision task.
func (wh *WorkflowHandler) PollForDecisionTask(
ctx context.Context,
pollRequest *types.PollForDecisionTaskRequest,
) (resp *types.PollForDecisionTaskResponse, retError error) {
callTime := time.Now()
if wh.isShuttingDown() {
return nil, validate.ErrShuttingDown
}
if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
return nil, err
}
if pollRequest == nil {
return nil, validate.ErrRequestNotSet
}
domainName := pollRequest.GetDomain()
tags := getDomainWfIDRunIDTags(domainName, nil)
if domainName == "" {
return nil, validate.ErrDomainNotSet
}
scope := getMetricsScopeWithDomain(metrics.FrontendPollForDecisionTaskScope, pollRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
wh.GetLogger().Debug("Received PollForDecisionTask")
if err := common.ValidateLongPollContextTimeout(
ctx,
"PollForDecisionTask",
wh.GetThrottledLogger().WithTags(tag.WorkflowDomainName(domainName), tag.WorkflowTaskListName(pollRequest.GetTaskList().GetName())),
); err != nil {
return nil, err
}
idLengthWarnLimit := wh.config.MaxIDLengthWarnLimit()
if !common.IsValidIDLength(
domainName,
scope,
idLengthWarnLimit,
wh.config.DomainNameMaxLength(domainName),
metrics.CadenceErrDomainNameExceededWarnLimit,
domainName,
wh.GetLogger(),
tag.IDTypeDomainName) {
return nil, validate.ErrDomainTooLong
}
if !common.IsValidIDLength(
pollRequest.GetIdentity(),
scope,
idLengthWarnLimit,
wh.config.IdentityMaxLength(domainName),
metrics.CadenceErrIdentityExceededWarnLimit,
domainName,
wh.GetLogger(),
tag.IDTypeIdentity) {
return nil, validate.ErrIdentityTooLong
}
if err := wh.validateTaskList(pollRequest.TaskList, scope, domainName); err != nil {
return nil, err
}
domainEntry, err := wh.GetDomainCache().GetDomain(domainName)
if err != nil {
return nil, err
}
domainID := domainEntry.GetInfo().ID
wh.GetLogger().Debug("Poll for decision.", tag.WorkflowDomainName(domainName), tag.WorkflowDomainID(domainID))
if err := wh.checkBadBinary(domainEntry, pollRequest.GetBinaryChecksum()); err != nil {
return nil, err
}
isolationGroup := wh.getIsolationGroup(ctx, domainName)
if !wh.waitUntilIsolationGroupHealthy(ctx, domainName, isolationGroup) {
return &types.PollForDecisionTaskResponse{}, nil
}
// it is possible that we wait for a very long time and the remaining time is not long enough for a long poll
// in this case, return an empty response
if err := common.ValidateLongPollContextTimeout(
ctx,
"PollForDecisionTask",
wh.GetThrottledLogger().WithTags(tag.WorkflowDomainName(domainName), tag.WorkflowTaskListName(pollRequest.GetTaskList().GetName())),
); err != nil {
return &types.PollForDecisionTaskResponse{}, nil
}
pollerID := uuid.New().String()
var matchingResp *types.MatchingPollForDecisionTaskResponse
op := func() error {
matchingResp, err = wh.GetMatchingClient().PollForDecisionTask(ctx, &types.MatchingPollForDecisionTaskRequest{
DomainUUID: domainID,
PollerID: pollerID,
PollRequest: pollRequest,
IsolationGroup: isolationGroup,
})
return err
}
err = wh.throttleRetry.Do(ctx, op)
if err != nil {
err = wh.cancelOutstandingPoll(ctx, err, domainID, persistence.TaskListTypeDecision, pollRequest.TaskList, pollerID)
if err != nil {
// For all other errors log an error and return it back to client.
ctxTimeout := "not-set"
ctxDeadline, ok := ctx.Deadline()
if ok {
ctxTimeout = ctxDeadline.Sub(callTime).String()
}
wh.GetLogger().Error("PollForDecisionTask failed.",
tag.WorkflowTaskListName(pollRequest.GetTaskList().GetName()),
tag.Value(ctxTimeout),
tag.Error(err))
return nil, err
}
// Must be cancellation error. Does'nt matter what we return here. Client already went away.
return nil, nil
}
tags = append(tags, []tag.Tag{tag.WorkflowID(
matchingResp.GetWorkflowExecution().GetWorkflowID()),
tag.WorkflowRunID(matchingResp.GetWorkflowExecution().GetRunID())}...)
resp, err = wh.createPollForDecisionTaskResponse(ctx, scope, domainID, matchingResp, matchingResp.GetBranchToken())
if err != nil {
return nil, err
}
return resp, nil
}
func (wh *WorkflowHandler) getIsolationGroup(ctx context.Context, domainName string) string {
return partition.IsolationGroupFromContext(ctx)
}
func (wh *WorkflowHandler) getPartitionConfig(ctx context.Context, domainName string) map[string]string {
return partition.ConfigFromContext(ctx)
}
func (wh *WorkflowHandler) isIsolationGroupHealthy(ctx context.Context, domainName, isolationGroup string) bool {
if wh.GetIsolationGroupState() != nil && wh.config.EnableTasklistIsolation(domainName) {
isDrained, err := wh.GetIsolationGroupState().IsDrained(ctx, domainName, isolationGroup)
if err != nil {
wh.GetLogger().Error("Failed to check if an isolation group is drained, assume it's healthy", tag.Error(err))
return true
}
return !isDrained
}
return true
}
func (wh *WorkflowHandler) waitUntilIsolationGroupHealthy(ctx context.Context, domainName, isolationGroup string) bool {
if wh.GetIsolationGroupState() != nil && wh.config.EnableTasklistIsolation(domainName) {
ticker := time.NewTicker(time.Second * 30)
defer ticker.Stop()
childCtx, cancel := common.CreateChildContext(ctx, 0.05)
defer cancel()
for {
isDrained, err := wh.GetIsolationGroupState().IsDrained(childCtx, domainName, isolationGroup)
if err != nil {
wh.GetLogger().Error("Failed to check if an isolation group is drained, assume it's healthy", tag.Error(err))
return true
}
if !isDrained {
break
}
select {
case <-childCtx.Done():
return false
case <-ticker.C:
}
}
}
return true
}
func (wh *WorkflowHandler) checkBadBinary(domainEntry *cache.DomainCacheEntry, binaryChecksum string) error {
if domainEntry.GetConfig().BadBinaries.Binaries != nil {
badBinaries := domainEntry.GetConfig().BadBinaries.Binaries
_, ok := badBinaries[binaryChecksum]
if ok {
wh.GetMetricsClient().IncCounter(metrics.FrontendPollForDecisionTaskScope, metrics.CadenceErrBadBinaryCounter)
return &types.BadRequestError{
Message: fmt.Sprintf("binary %v already marked as bad deployment", binaryChecksum),
}
}
}
return nil
}
func (wh *WorkflowHandler) cancelOutstandingPoll(ctx context.Context, err error, domainID string, taskListType int32,
taskList *types.TaskList, pollerID string) error {
// First check if this err is due to context cancellation. This means client connection to frontend is closed.
if ctx.Err() == context.Canceled {
// Our rpc stack does not propagates context cancellation to the other service. Lets make an explicit
// call to matching to notify this poller is gone to prevent any tasks being dispatched to zombie pollers.
err = wh.GetMatchingClient().CancelOutstandingPoll(context.Background(), &types.CancelOutstandingPollRequest{
DomainUUID: domainID,
TaskListType: common.Int32Ptr(taskListType),
TaskList: taskList,
PollerID: pollerID,
})
// We can not do much if this call fails. Just log the error and move on
if err != nil {
wh.GetLogger().Warn("Failed to cancel outstanding poller.",
tag.WorkflowTaskListName(taskList.GetName()), tag.Error(err))
}
// Clear error as we don't want to report context cancellation error to count against our SLA
return nil
}
return err
}
// RecordActivityTaskHeartbeat - Record Activity Task Heart beat.
func (wh *WorkflowHandler) RecordActivityTaskHeartbeat(
ctx context.Context,
heartbeatRequest *types.RecordActivityTaskHeartbeatRequest,
) (resp *types.RecordActivityTaskHeartbeatResponse, retError error) {
if wh.isShuttingDown() {
return nil, validate.ErrShuttingDown
}
if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
return nil, err
}
if heartbeatRequest == nil {
return nil, validate.ErrRequestNotSet
}
wh.GetLogger().Debug("Received RecordActivityTaskHeartbeat")
if heartbeatRequest.TaskToken == nil {
return nil, validate.ErrTaskTokenNotSet
}
taskToken, err := wh.tokenSerializer.Deserialize(heartbeatRequest.TaskToken)
if err != nil {
return nil, err
}
if taskToken.DomainID == "" {
return nil, validate.ErrDomainNotSet
}
domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
if err != nil {
return nil, err
}
dw := domainWrapper{
domain: domainName,
}
scope := getMetricsScopeWithDomain(metrics.FrontendRecordActivityTaskHeartbeatScope, dw, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
sizeLimitError := wh.config.BlobSizeLimitError(domainName)
sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
if err := common.CheckEventBlobSizeLimit(
len(heartbeatRequest.Details),
sizeLimitWarn,
sizeLimitError,
taskToken.DomainID,
taskToken.WorkflowID,
taskToken.RunID,
scope,
wh.GetThrottledLogger(),
tag.BlobSizeViolationOperation("RecordActivityTaskHeartbeat"),
); err != nil {
// heartbeat details exceed size limit, we would fail the activity immediately with explicit error reason
failRequest := &types.RespondActivityTaskFailedRequest{
TaskToken: heartbeatRequest.TaskToken,
Reason: common.StringPtr(common.FailureReasonHeartbeatExceedsLimit),
Details: heartbeatRequest.Details[0:sizeLimitError],
Identity: heartbeatRequest.Identity,
}
err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
DomainUUID: taskToken.DomainID,
FailedRequest: failRequest,
})
if err != nil {
return nil, wh.normalizeVersionedErrors(ctx, err)
}
resp = &types.RecordActivityTaskHeartbeatResponse{CancelRequested: true}
} else {
resp, err = wh.GetHistoryClient().RecordActivityTaskHeartbeat(ctx, &types.HistoryRecordActivityTaskHeartbeatRequest{
DomainUUID: taskToken.DomainID,
HeartbeatRequest: heartbeatRequest,
})
if err != nil {
return nil, wh.normalizeVersionedErrors(ctx, err)
}
}
return resp, nil
}
// RecordActivityTaskHeartbeatByID - Record Activity Task Heart beat.
func (wh *WorkflowHandler) RecordActivityTaskHeartbeatByID(
ctx context.Context,
heartbeatRequest *types.RecordActivityTaskHeartbeatByIDRequest,
) (resp *types.RecordActivityTaskHeartbeatResponse, retError error) {
if wh.isShuttingDown() {
return nil, validate.ErrShuttingDown
}
if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
return nil, err
}
if heartbeatRequest == nil {
return nil, validate.ErrRequestNotSet
}
domainName := heartbeatRequest.GetDomain()
if domainName == "" {
return nil, validate.ErrDomainNotSet
}
wh.GetLogger().Debug("Received RecordActivityTaskHeartbeatByID")
domainID, err := wh.GetDomainCache().GetDomainID(domainName)
if err != nil {
return nil, err
}
workflowID := heartbeatRequest.GetWorkflowID()
runID := heartbeatRequest.GetRunID() // runID is optional so can be empty
activityID := heartbeatRequest.GetActivityID()
if domainID == "" {
return nil, validate.ErrDomainNotSet
}
if workflowID == "" {
return nil, validate.ErrWorkflowIDNotSet
}
if activityID == "" {
return nil, validate.ErrActivityIDNotSet
}
taskToken := &common.TaskToken{
DomainID: domainID,
RunID: runID,
WorkflowID: workflowID,
ScheduleID: common.EmptyEventID,
ActivityID: activityID,
}
token, err := wh.tokenSerializer.Serialize(taskToken)
if err != nil {
return nil, err
}
scope := getMetricsScopeWithDomain(metrics.FrontendRecordActivityTaskHeartbeatByIDScope, heartbeatRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
sizeLimitError := wh.config.BlobSizeLimitError(domainName)
sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
if err := common.CheckEventBlobSizeLimit(
len(heartbeatRequest.Details),
sizeLimitWarn,
sizeLimitError,
taskToken.DomainID,
taskToken.WorkflowID,
taskToken.RunID,
scope,
wh.GetThrottledLogger(),
tag.BlobSizeViolationOperation("RecordActivityTaskHeartbeatByID"),
); err != nil {
// heartbeat details exceed size limit, we would fail the activity immediately with explicit error reason
failRequest := &types.RespondActivityTaskFailedRequest{
TaskToken: token,
Reason: common.StringPtr(common.FailureReasonHeartbeatExceedsLimit),
Details: heartbeatRequest.Details[0:sizeLimitError],
Identity: heartbeatRequest.Identity,
}
err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
DomainUUID: taskToken.DomainID,
FailedRequest: failRequest,
})
if err != nil {
return nil, wh.normalizeVersionedErrors(ctx, err)
}
resp = &types.RecordActivityTaskHeartbeatResponse{CancelRequested: true}
} else {
req := &types.RecordActivityTaskHeartbeatRequest{
TaskToken: token,
Details: heartbeatRequest.Details,
Identity: heartbeatRequest.Identity,
}
resp, err = wh.GetHistoryClient().RecordActivityTaskHeartbeat(ctx, &types.HistoryRecordActivityTaskHeartbeatRequest{
DomainUUID: taskToken.DomainID,
HeartbeatRequest: req,
})
if err != nil {
return nil, wh.normalizeVersionedErrors(ctx, err)
}
}
return resp, nil
}
// RespondActivityTaskCompleted - response to an activity task
func (wh *WorkflowHandler) RespondActivityTaskCompleted(
ctx context.Context,
completeRequest *types.RespondActivityTaskCompletedRequest,
) (retError error) {
if wh.isShuttingDown() {
return validate.ErrShuttingDown
}
if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
return err
}
if completeRequest == nil {
return validate.ErrRequestNotSet
}
if completeRequest.TaskToken == nil {
return validate.ErrTaskTokenNotSet
}
taskToken, err := wh.tokenSerializer.Deserialize(completeRequest.TaskToken)
if err != nil {
return err
}
if taskToken.DomainID == "" {
return validate.ErrDomainNotSet
}
domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
if err != nil {
return err
}
dw := domainWrapper{
domain: domainName,
}
scope := getMetricsScopeWithDomain(metrics.FrontendRespondActivityTaskCompletedScope, dw, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
if !common.IsValidIDLength(
completeRequest.GetIdentity(),
scope,
wh.config.MaxIDLengthWarnLimit(),
wh.config.IdentityMaxLength(domainName),
metrics.CadenceErrIdentityExceededWarnLimit,
domainName,
wh.GetLogger(),
tag.IDTypeIdentity) {
return validate.ErrIdentityTooLong
}
sizeLimitError := wh.config.BlobSizeLimitError(domainName)
sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
if err := common.CheckEventBlobSizeLimit(
len(completeRequest.Result),
sizeLimitWarn,
sizeLimitError,
taskToken.DomainID,
taskToken.WorkflowID,
taskToken.RunID,
scope,
wh.GetThrottledLogger(),
tag.BlobSizeViolationOperation("RespondActivityTaskCompleted"),
); err != nil {
// result exceeds blob size limit, we would record it as failure
failRequest := &types.RespondActivityTaskFailedRequest{
TaskToken: completeRequest.TaskToken,
Reason: common.StringPtr(common.FailureReasonCompleteResultExceedsLimit),
Details: completeRequest.Result[0:sizeLimitError],
Identity: completeRequest.Identity,
}
err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
DomainUUID: taskToken.DomainID,
FailedRequest: failRequest,
})
if err != nil {
return wh.normalizeVersionedErrors(ctx, err)
}
} else {
err = wh.GetHistoryClient().RespondActivityTaskCompleted(ctx, &types.HistoryRespondActivityTaskCompletedRequest{
DomainUUID: taskToken.DomainID,
CompleteRequest: completeRequest,
})
if err != nil {
return wh.normalizeVersionedErrors(ctx, err)
}
}
return nil
}
// RespondActivityTaskCompletedByID - response to an activity task
func (wh *WorkflowHandler) RespondActivityTaskCompletedByID(
ctx context.Context,
completeRequest *types.RespondActivityTaskCompletedByIDRequest,
) (retError error) {
if wh.isShuttingDown() {
return validate.ErrShuttingDown
}
if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
return err
}
if completeRequest == nil {
return validate.ErrRequestNotSet
}
domainName := completeRequest.GetDomain()
if domainName == "" {
return validate.ErrDomainNotSet
}
domainID, err := wh.GetDomainCache().GetDomainID(domainName)
if err != nil {
return err
}
workflowID := completeRequest.GetWorkflowID()
runID := completeRequest.GetRunID() // runID is optional so can be empty
activityID := completeRequest.GetActivityID()
if domainID == "" {
return validate.ErrDomainNotSet
}
if workflowID == "" {
return validate.ErrWorkflowIDNotSet
}
if activityID == "" {
return validate.ErrActivityIDNotSet
}
scope := getMetricsScopeWithDomain(metrics.FrontendRespondActivityTaskCompletedByIDScope, completeRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
if !common.IsValidIDLength(
completeRequest.GetIdentity(),
scope,
wh.config.MaxIDLengthWarnLimit(),
wh.config.IdentityMaxLength(domainName),
metrics.CadenceErrIdentityExceededWarnLimit,
domainName,
wh.GetLogger(),
tag.IDTypeIdentity) {
return validate.ErrIdentityTooLong
}
taskToken := &common.TaskToken{
DomainID: domainID,
RunID: runID,
WorkflowID: workflowID,
ScheduleID: common.EmptyEventID,
ActivityID: activityID,
}
token, err := wh.tokenSerializer.Serialize(taskToken)
if err != nil {
return err
}
sizeLimitError := wh.config.BlobSizeLimitError(domainName)
sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
if err := common.CheckEventBlobSizeLimit(
len(completeRequest.Result),
sizeLimitWarn,
sizeLimitError,
taskToken.DomainID,
taskToken.WorkflowID,
taskToken.RunID,
scope,
wh.GetThrottledLogger(),
tag.BlobSizeViolationOperation("RespondActivityTaskCompletedByID"),
); err != nil {
// result exceeds blob size limit, we would record it as failure
failRequest := &types.RespondActivityTaskFailedRequest{
TaskToken: token,
Reason: common.StringPtr(common.FailureReasonCompleteResultExceedsLimit),
Details: completeRequest.Result[0:sizeLimitError],
Identity: completeRequest.Identity,
}
err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
DomainUUID: taskToken.DomainID,
FailedRequest: failRequest,
})
if err != nil {
return wh.normalizeVersionedErrors(ctx, err)
}
} else {
req := &types.RespondActivityTaskCompletedRequest{
TaskToken: token,
Result: completeRequest.Result,
Identity: completeRequest.Identity,
}
err = wh.GetHistoryClient().RespondActivityTaskCompleted(ctx, &types.HistoryRespondActivityTaskCompletedRequest{
DomainUUID: taskToken.DomainID,
CompleteRequest: req,
})
if err != nil {
return wh.normalizeVersionedErrors(ctx, err)
}
}
return nil
}
// RespondActivityTaskFailed - response to an activity task failure
func (wh *WorkflowHandler) RespondActivityTaskFailed(
ctx context.Context,
failedRequest *types.RespondActivityTaskFailedRequest,
) (retError error) {
if wh.isShuttingDown() {
return validate.ErrShuttingDown
}
if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
return err
}
if failedRequest == nil {
return validate.ErrRequestNotSet
}
if failedRequest.TaskToken == nil {
return validate.ErrTaskTokenNotSet
}
taskToken, err := wh.tokenSerializer.Deserialize(failedRequest.TaskToken)
if err != nil {
return err
}
if taskToken.DomainID == "" {
return validate.ErrDomainNotSet
}
domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
if err != nil {
return err
}
dw := domainWrapper{
domain: domainName,
}
scope := getMetricsScopeWithDomain(metrics.FrontendRespondActivityTaskFailedScope, dw, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
if !common.IsValidIDLength(
failedRequest.GetIdentity(),
scope,
wh.config.MaxIDLengthWarnLimit(),
wh.config.IdentityMaxLength(domainName),
metrics.CadenceErrIdentityExceededWarnLimit,
domainName,
wh.GetLogger(),
tag.IDTypeIdentity) {
return validate.ErrIdentityTooLong
}
sizeLimitError := wh.config.BlobSizeLimitError(domainName)
sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
if err := common.CheckEventBlobSizeLimit(
len(failedRequest.Details),
sizeLimitWarn,
sizeLimitError,
taskToken.DomainID,
taskToken.WorkflowID,
taskToken.RunID,
scope,
wh.GetThrottledLogger(),
tag.BlobSizeViolationOperation("RespondActivityTaskFailed"),
); err != nil {
// details exceeds blob size limit, we would truncate the details and put a specific error reason
failedRequest.Reason = common.StringPtr(common.FailureReasonFailureDetailsExceedsLimit)
failedRequest.Details = failedRequest.Details[0:sizeLimitError]
}
err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
DomainUUID: taskToken.DomainID,
FailedRequest: failedRequest,
})
if err != nil {
return wh.normalizeVersionedErrors(ctx, err)
}
return nil
}
// RespondActivityTaskFailedByID - response to an activity task failure
func (wh *WorkflowHandler) RespondActivityTaskFailedByID(
ctx context.Context,
failedRequest *types.RespondActivityTaskFailedByIDRequest,
) (retError error) {
if wh.isShuttingDown() {
return validate.ErrShuttingDown
}
if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
return err
}
if failedRequest == nil {
return validate.ErrRequestNotSet
}
domainName := failedRequest.GetDomain()
if domainName == "" {
return validate.ErrDomainNotSet
}
domainID, err := wh.GetDomainCache().GetDomainID(domainName)
if err != nil {
return err
}
workflowID := failedRequest.GetWorkflowID()
runID := failedRequest.GetRunID() // runID is optional so can be empty
activityID := failedRequest.GetActivityID()
if domainID == "" {
return validate.ErrDomainNotSet
}
if workflowID == "" {
return validate.ErrWorkflowIDNotSet
}
if activityID == "" {
return validate.ErrActivityIDNotSet
}
scope := getMetricsScopeWithDomain(metrics.FrontendRespondActivityTaskFailedByIDScope, failedRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
if !common.IsValidIDLength(
failedRequest.GetIdentity(),
scope,
wh.config.MaxIDLengthWarnLimit(),
wh.config.IdentityMaxLength(failedRequest.GetDomain()),
metrics.CadenceErrIdentityExceededWarnLimit,
domainName,
wh.GetLogger(),
tag.IDTypeIdentity) {
return validate.ErrIdentityTooLong
}
taskToken := &common.TaskToken{
DomainID: domainID,
RunID: runID,
WorkflowID: workflowID,
ScheduleID: common.EmptyEventID,
ActivityID: activityID,
}
token, err := wh.tokenSerializer.Serialize(taskToken)
if err != nil {
return err
}
sizeLimitError := wh.config.BlobSizeLimitError(domainName)
sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
if err := common.CheckEventBlobSizeLimit(
len(failedRequest.Details),
sizeLimitWarn,
sizeLimitError,
taskToken.DomainID,
taskToken.WorkflowID,
taskToken.RunID,
scope,
wh.GetThrottledLogger(),
tag.BlobSizeViolationOperation("RespondActivityTaskFailedByID"),
); err != nil {
// details exceeds blob size limit, we would truncate the details and put a specific error reason
failedRequest.Reason = common.StringPtr(common.FailureReasonFailureDetailsExceedsLimit)
failedRequest.Details = failedRequest.Details[0:sizeLimitError]
}
req := &types.RespondActivityTaskFailedRequest{
TaskToken: token,
Reason: failedRequest.Reason,
Details: failedRequest.Details,
Identity: failedRequest.Identity,
}
err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
DomainUUID: taskToken.DomainID,
FailedRequest: req,
})
if err != nil {
return wh.normalizeVersionedErrors(ctx, err)
}
return nil
}
// RespondActivityTaskCanceled - called to cancel an activity task
func (wh *WorkflowHandler) RespondActivityTaskCanceled(
ctx context.Context,
cancelRequest *types.RespondActivityTaskCanceledRequest,
) (retError error) {
if wh.isShuttingDown() {
return validate.ErrShuttingDown
}
if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
return err
}
if cancelRequest == nil {
return validate.ErrRequestNotSet
}
if cancelRequest.TaskToken == nil {
return validate.ErrTaskTokenNotSet
}
taskToken, err := wh.tokenSerializer.Deserialize(cancelRequest.TaskToken)
if err != nil {
return err
}
if taskToken.DomainID == "" {
return validate.ErrDomainNotSet
}
domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
if err != nil {
return err
}
dw := domainWrapper{
domain: domainName,
}
scope := getMetricsScopeWithDomain(metrics.FrontendRespondActivityTaskCanceledScope, dw, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
if !common.IsValidIDLength(
cancelRequest.GetIdentity(),
scope,
wh.config.MaxIDLengthWarnLimit(),
wh.config.IdentityMaxLength(domainName),
metrics.CadenceErrIdentityExceededWarnLimit,
domainName,
wh.GetLogger(),
tag.IDTypeIdentity) {
return validate.ErrIdentityTooLong
}
sizeLimitError := wh.config.BlobSizeLimitError(domainName)
sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
if err := common.CheckEventBlobSizeLimit(
len(cancelRequest.Details),
sizeLimitWarn,
sizeLimitError,
taskToken.DomainID,
taskToken.WorkflowID,
taskToken.RunID,
scope,
wh.GetThrottledLogger(),
tag.BlobSizeViolationOperation("RespondActivityTaskCanceled"),
); err != nil {
// details exceeds blob size limit, we would record it as failure
failRequest := &types.RespondActivityTaskFailedRequest{
TaskToken: cancelRequest.TaskToken,
Reason: common.StringPtr(common.FailureReasonCancelDetailsExceedsLimit),
Details: cancelRequest.Details[0:sizeLimitError],
Identity: cancelRequest.Identity,
}
err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
DomainUUID: taskToken.DomainID,
FailedRequest: failRequest,
})
if err != nil {
return wh.normalizeVersionedErrors(ctx, err)
}
} else {
err = wh.GetHistoryClient().RespondActivityTaskCanceled(ctx, &types.HistoryRespondActivityTaskCanceledRequest{
DomainUUID: taskToken.DomainID,
CancelRequest: cancelRequest,
})
if err != nil {
return wh.normalizeVersionedErrors(ctx, err)
}
}
return nil
}
// RespondActivityTaskCanceledByID - called to cancel an activity task
func (wh *WorkflowHandler) RespondActivityTaskCanceledByID(
ctx context.Context,
cancelRequest *types.RespondActivityTaskCanceledByIDRequest,
) (retError error) {
if wh.isShuttingDown() {
return validate.ErrShuttingDown
}
if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
return err
}
if cancelRequest == nil {
return validate.ErrRequestNotSet
}
domainName := cancelRequest.GetDomain()
if domainName == "" {
return validate.ErrDomainNotSet
}
domainID, err := wh.GetDomainCache().GetDomainID(domainName)
if err != nil {
return err
}
workflowID := cancelRequest.GetWorkflowID()
runID := cancelRequest.GetRunID() // runID is optional so can be empty
activityID := cancelRequest.GetActivityID()
if domainID == "" {
return validate.ErrDomainNotSet
}
if workflowID == "" {
return validate.ErrWorkflowIDNotSet
}
if activityID == "" {
return validate.ErrActivityIDNotSet
}
scope := getMetricsScopeWithDomain(metrics.FrontendRespondActivityTaskCanceledByIDScope, cancelRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
if !common.IsValidIDLength(
cancelRequest.GetIdentity(),
scope,
wh.config.MaxIDLengthWarnLimit(),
wh.config.IdentityMaxLength(cancelRequest.GetDomain()),
metrics.CadenceErrIdentityExceededWarnLimit,
domainName,
wh.GetLogger(),
tag.IDTypeIdentity) {
return validate.ErrIdentityTooLong
}
taskToken := &common.TaskToken{
DomainID: domainID,
RunID: runID,
WorkflowID: workflowID,
ScheduleID: common.EmptyEventID,
ActivityID: activityID,
}
token, err := wh.tokenSerializer.Serialize(taskToken)
if err != nil {
return err
}
sizeLimitError := wh.config.BlobSizeLimitError(domainName)
sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
if err := common.CheckEventBlobSizeLimit(
len(cancelRequest.Details),
sizeLimitWarn,
sizeLimitError,
taskToken.DomainID,
taskToken.WorkflowID,
taskToken.RunID,
scope,
wh.GetThrottledLogger(),
tag.BlobSizeViolationOperation("RespondActivityTaskCanceledByID"),
); err != nil {
// details exceeds blob size limit, we would record it as failure
failRequest := &types.RespondActivityTaskFailedRequest{
TaskToken: token,
Reason: common.StringPtr(common.FailureReasonCancelDetailsExceedsLimit),
Details: cancelRequest.Details[0:sizeLimitError],
Identity: cancelRequest.Identity,
}
err = wh.GetHistoryClient().RespondActivityTaskFailed(ctx, &types.HistoryRespondActivityTaskFailedRequest{
DomainUUID: taskToken.DomainID,
FailedRequest: failRequest,
})
if err != nil {
return wh.normalizeVersionedErrors(ctx, err)
}
} else {
req := &types.RespondActivityTaskCanceledRequest{
TaskToken: token,
Details: cancelRequest.Details,
Identity: cancelRequest.Identity,
}
err = wh.GetHistoryClient().RespondActivityTaskCanceled(ctx, &types.HistoryRespondActivityTaskCanceledRequest{
DomainUUID: taskToken.DomainID,
CancelRequest: req,
})
if err != nil {
return wh.normalizeVersionedErrors(ctx, err)
}
}
return nil
}
// RespondDecisionTaskCompleted - response to a decision task
func (wh *WorkflowHandler) RespondDecisionTaskCompleted(
ctx context.Context,
completeRequest *types.RespondDecisionTaskCompletedRequest,
) (resp *types.RespondDecisionTaskCompletedResponse, retError error) {
if wh.isShuttingDown() {
return nil, validate.ErrShuttingDown
}
if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
return nil, err
}
if completeRequest == nil {
return nil, validate.ErrRequestNotSet
}
if completeRequest.TaskToken == nil {
return nil, validate.ErrTaskTokenNotSet
}
taskToken, err := wh.tokenSerializer.Deserialize(completeRequest.TaskToken)
if err != nil {
return nil, err
}
if taskToken.DomainID == "" {
return nil, validate.ErrDomainNotSet
}
domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
if err != nil {
return nil, err
}
dw := domainWrapper{
domain: domainName,
}
scope := getMetricsScopeWithDomain(metrics.FrontendRespondDecisionTaskCompletedScope, dw, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
if !common.IsValidIDLength(
completeRequest.GetIdentity(),
scope,
wh.config.MaxIDLengthWarnLimit(),
wh.config.IdentityMaxLength(domainName),
metrics.CadenceErrIdentityExceededWarnLimit,
domainName,
wh.GetLogger(),
tag.IDTypeIdentity) {
return nil, validate.ErrIdentityTooLong
}
if err := common.CheckDecisionResultLimit(
len(completeRequest.Decisions),
wh.config.DecisionResultCountLimit(domainName),
scope); err != nil {
return nil, err
}
histResp, err := wh.GetHistoryClient().RespondDecisionTaskCompleted(ctx, &types.HistoryRespondDecisionTaskCompletedRequest{
DomainUUID: taskToken.DomainID,
CompleteRequest: completeRequest},
)
if err != nil {
return nil, wh.normalizeVersionedErrors(ctx, err)
}
completedResp := &types.RespondDecisionTaskCompletedResponse{}
completedResp.ActivitiesToDispatchLocally = histResp.ActivitiesToDispatchLocally
if completeRequest.GetReturnNewDecisionTask() && histResp != nil && histResp.StartedResponse != nil {
taskToken := &common.TaskToken{
DomainID: taskToken.DomainID,
WorkflowID: taskToken.WorkflowID,
RunID: taskToken.RunID,
ScheduleID: histResp.StartedResponse.GetScheduledEventID(),
ScheduleAttempt: histResp.StartedResponse.GetAttempt(),
}
token, _ := wh.tokenSerializer.Serialize(taskToken)
workflowExecution := &types.WorkflowExecution{
WorkflowID: taskToken.WorkflowID,
RunID: taskToken.RunID,
}
matchingResp := common.CreateMatchingPollForDecisionTaskResponse(histResp.StartedResponse, workflowExecution, token)
newDecisionTask, err := wh.createPollForDecisionTaskResponse(ctx, scope, taskToken.DomainID, matchingResp, matchingResp.GetBranchToken())
if err != nil {
return nil, err
}
completedResp.DecisionTask = newDecisionTask
}
return completedResp, nil
}
// RespondDecisionTaskFailed - failed response to a decision task
func (wh *WorkflowHandler) RespondDecisionTaskFailed(
ctx context.Context,
failedRequest *types.RespondDecisionTaskFailedRequest,
) (retError error) {
if wh.isShuttingDown() {
return validate.ErrShuttingDown
}
if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
return err
}
if failedRequest == nil {
return validate.ErrRequestNotSet
}
if failedRequest.TaskToken == nil {
return validate.ErrTaskTokenNotSet
}
taskToken, err := wh.tokenSerializer.Deserialize(failedRequest.TaskToken)
if err != nil {
return err
}
if taskToken.DomainID == "" {
return validate.ErrDomainNotSet
}
domainName, err := wh.GetDomainCache().GetDomainName(taskToken.DomainID)
if err != nil {
return err
}
dw := domainWrapper{
domain: domainName,
}
scope := getMetricsScopeWithDomain(metrics.FrontendRespondDecisionTaskFailedScope, dw, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
if !common.IsValidIDLength(
failedRequest.GetIdentity(),
scope,
wh.config.MaxIDLengthWarnLimit(),
wh.config.IdentityMaxLength(domainName),
metrics.CadenceErrIdentityExceededWarnLimit,
domainName,
wh.GetLogger(),
tag.IDTypeIdentity) {
return validate.ErrIdentityTooLong
}
sizeLimitError := wh.config.BlobSizeLimitError(domainName)
sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
if err := common.CheckEventBlobSizeLimit(
len(failedRequest.Details),
sizeLimitWarn,
sizeLimitError,
taskToken.DomainID,
taskToken.WorkflowID,
taskToken.RunID,
scope,
wh.GetThrottledLogger(),
tag.BlobSizeViolationOperation("RespondDecisionTaskFailed"),
); err != nil {
// details exceed, we would just truncate the size for decision task failed as the details is not used anywhere by client code
failedRequest.Details = failedRequest.Details[0:sizeLimitError]
}
err = wh.GetHistoryClient().RespondDecisionTaskFailed(ctx, &types.HistoryRespondDecisionTaskFailedRequest{
DomainUUID: taskToken.DomainID,
FailedRequest: failedRequest,
})
if err != nil {
return wh.normalizeVersionedErrors(ctx, err)
}
return nil
}
// RespondQueryTaskCompleted - response to a query task
func (wh *WorkflowHandler) RespondQueryTaskCompleted(
ctx context.Context,
completeRequest *types.RespondQueryTaskCompletedRequest,
) (retError error) {
if wh.isShuttingDown() {
return validate.ErrShuttingDown
}
if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
return err
}
if completeRequest == nil {
return validate.ErrRequestNotSet
}
if completeRequest.TaskToken == nil {
return validate.ErrTaskTokenNotSet
}
queryTaskToken, err := wh.tokenSerializer.DeserializeQueryTaskToken(completeRequest.TaskToken)
if err != nil {
return err
}
if queryTaskToken.DomainID == "" || queryTaskToken.TaskList == "" || queryTaskToken.TaskID == "" {
return validate.ErrInvalidTaskToken
}
domainName, err := wh.GetDomainCache().GetDomainName(queryTaskToken.DomainID)
if err != nil {
return err
}
dw := domainWrapper{
domain: domainName,
}
sizeLimitError := wh.config.BlobSizeLimitError(domainName)
sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
scope := getMetricsScopeWithDomain(metrics.FrontendRespondQueryTaskCompletedScope, dw, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
if err := common.CheckEventBlobSizeLimit(
len(completeRequest.GetQueryResult()),
sizeLimitWarn,
sizeLimitError,
queryTaskToken.DomainID,
"",
"",
scope,
wh.GetThrottledLogger(),
tag.BlobSizeViolationOperation("RespondQueryTaskCompleted"),
); err != nil {
completeRequest = &types.RespondQueryTaskCompletedRequest{
TaskToken: completeRequest.TaskToken,
CompletedType: types.QueryTaskCompletedTypeFailed.Ptr(),
QueryResult: nil,
ErrorMessage: err.Error(),
}
}
call := yarpc.CallFromContext(ctx)
completeRequest.WorkerVersionInfo = &types.WorkerVersionInfo{
Impl: call.Header(common.ClientImplHeaderName),
FeatureVersion: call.Header(common.FeatureVersionHeaderName),
}
matchingRequest := &types.MatchingRespondQueryTaskCompletedRequest{
DomainUUID: queryTaskToken.DomainID,
TaskList: &types.TaskList{Name: queryTaskToken.TaskList},
TaskID: queryTaskToken.TaskID,
CompletedRequest: completeRequest,
}
err = wh.GetMatchingClient().RespondQueryTaskCompleted(ctx, matchingRequest)
if err != nil {
return err
}
return nil
}
func (wh *WorkflowHandler) StartWorkflowExecutionAsync(
ctx context.Context,
startRequest *types.StartWorkflowExecutionAsyncRequest,
) (resp *types.StartWorkflowExecutionAsyncResponse, retError error) {
if wh.isShuttingDown() {
return nil, validate.ErrShuttingDown
}
scope := getMetricsScopeWithDomain(metrics.FrontendStartWorkflowExecutionAsyncScope, startRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
// validate request before pushing to queue
err := wh.validateStartWorkflowExecutionRequest(ctx, startRequest.StartWorkflowExecutionRequest, scope)
if err != nil {
return nil, err
}
producer, err := wh.producerManager.GetProducerByDomain(startRequest.GetDomain())
if err != nil {
return nil, err
}
// serialize the message to be sent to the queue
payload, err := json.Marshal(startRequest)
if err != nil {
return nil, err
}
// propagate the headers from the context to the message
clientHeaders := common.GetClientHeaders(ctx)
header := &shared.Header{
Fields: map[string][]byte{},
}
for k, v := range clientHeaders {
header.Fields[k] = []byte(v)
}
messageType := sqlblobs.AsyncRequestTypeStartWorkflowExecutionAsyncRequest
message := &sqlblobs.AsyncRequestMessage{
PartitionKey: common.StringPtr(startRequest.GetWorkflowID()),
Type: &messageType,
Header: header,
Encoding: common.StringPtr(string(common.EncodingTypeJSON)),
Payload: payload,
}
err = producer.Publish(ctx, message)
if err != nil {
return nil, err
}
return &types.StartWorkflowExecutionAsyncResponse{}, nil
}
// StartWorkflowExecution - Creates a new workflow execution
func (wh *WorkflowHandler) StartWorkflowExecution(
ctx context.Context,
startRequest *types.StartWorkflowExecutionRequest,
) (resp *types.StartWorkflowExecutionResponse, retError error) {
if wh.isShuttingDown() {
return nil, validate.ErrShuttingDown
}
scope := getMetricsScopeWithDomain(metrics.FrontendStartWorkflowExecutionScope, startRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
err := wh.validateStartWorkflowExecutionRequest(ctx, startRequest, scope)
if err != nil {
return nil, err
}
domainName := startRequest.GetDomain()
domainID, err := wh.GetDomainCache().GetDomainID(domainName)
if err != nil {
return nil, err
}
wh.GetLogger().Debug("Start workflow execution request domainID", tag.WorkflowDomainID(domainID))
historyRequest, err := common.CreateHistoryStartWorkflowRequest(
domainID, startRequest, time.Now(), wh.getPartitionConfig(ctx, domainName))
if err != nil {
return nil, err
}
resp, err = wh.GetHistoryClient().StartWorkflowExecution(ctx, historyRequest)
if err != nil {
return nil, err
}
return resp, nil
}
func (wh *WorkflowHandler) validateStartWorkflowExecutionRequest(ctx context.Context, startRequest *types.StartWorkflowExecutionRequest, scope metrics.Scope) error {
if startRequest == nil {
return validate.ErrRequestNotSet
}
domainName := startRequest.GetDomain()
if domainName == "" {
return validate.ErrDomainNotSet
}
if startRequest.GetWorkflowID() == "" {
return validate.ErrWorkflowIDNotSet
}
if _, err := uuid.Parse(startRequest.RequestID); err != nil {
return &types.BadRequestError{Message: fmt.Sprintf("requestId %q is not a valid UUID", startRequest.RequestID)}
}
if startRequest.WorkflowType == nil || startRequest.WorkflowType.GetName() == "" {
return validate.ErrWorkflowTypeNotSet
}
if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
return err
}
idLengthWarnLimit := wh.config.MaxIDLengthWarnLimit()
if !common.IsValidIDLength(
domainName,
scope,
idLengthWarnLimit,
wh.config.DomainNameMaxLength(domainName),
metrics.CadenceErrDomainNameExceededWarnLimit,
domainName,
wh.GetLogger(),
tag.IDTypeDomainName) {
return validate.ErrDomainTooLong
}
if !common.IsValidIDLength(
startRequest.GetWorkflowID(),
scope,
idLengthWarnLimit,
wh.config.WorkflowIDMaxLength(domainName),
metrics.CadenceErrWorkflowIDExceededWarnLimit,
domainName,
wh.GetLogger(),
tag.IDTypeWorkflowID) {
return validate.ErrWorkflowIDTooLong
}
if err := common.ValidateRetryPolicy(startRequest.RetryPolicy); err != nil {
return err
}
wh.GetLogger().Debug(
"Received StartWorkflowExecution. WorkflowID",
tag.WorkflowID(startRequest.GetWorkflowID()))
if !common.IsValidIDLength(
startRequest.WorkflowType.GetName(),
scope,
idLengthWarnLimit,
wh.config.WorkflowTypeMaxLength(domainName),
metrics.CadenceErrWorkflowTypeExceededWarnLimit,
domainName,
wh.GetLogger(),
tag.IDTypeWorkflowType) {
return validate.ErrWorkflowTypeTooLong
}
if err := wh.validateTaskList(startRequest.TaskList, scope, domainName); err != nil {
return err
}
if startRequest.GetExecutionStartToCloseTimeoutSeconds() <= 0 {
return validate.ErrInvalidExecutionStartToCloseTimeoutSeconds
}
if startRequest.GetTaskStartToCloseTimeoutSeconds() <= 0 {
return validate.ErrInvalidTaskStartToCloseTimeoutSeconds
}
if startRequest.GetDelayStartSeconds() < 0 {
return validate.ErrInvalidDelayStartSeconds
}
if startRequest.GetJitterStartSeconds() < 0 {
return validate.ErrInvalidJitterStartSeconds
}
jitter := startRequest.GetJitterStartSeconds()
cron := startRequest.GetCronSchedule()
if cron != "" {
if _, err := backoff.ValidateSchedule(startRequest.GetCronSchedule()); err != nil {
return err
}
}
if jitter > 0 && cron != "" {
// Calculate the cron duration and ensure that jitter is not greater than the cron duration,
// because that would be confusing to users.
// Request using start/end time zero value, which will get us an exact answer (i.e. its not in the
// middle of a minute)
backoffSeconds, err := backoff.GetBackoffForNextScheduleInSeconds(cron, time.Time{}, time.Time{}, jitter)
if err != nil {
return err
}
if jitter > backoffSeconds {
return validate.ErrInvalidJitterStartSeconds2
}
}
if !common.IsValidIDLength(
startRequest.GetRequestID(),
scope,
idLengthWarnLimit,
wh.config.RequestIDMaxLength(domainName),
metrics.CadenceErrRequestIDExceededWarnLimit,
domainName,
wh.GetLogger(),
tag.IDTypeRequestID) {
return validate.ErrRequestIDTooLong
}
if err := wh.searchAttributesValidator.ValidateSearchAttributes(startRequest.SearchAttributes, domainName); err != nil {
return err
}
wh.GetLogger().Debug("Start workflow execution request domain", tag.WorkflowDomainName(domainName))
domainID, err := wh.GetDomainCache().GetDomainID(domainName)
if err != nil {
return err
}
sizeLimitError := wh.config.BlobSizeLimitError(domainName)
sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
actualSize := len(startRequest.Input)
if startRequest.Memo != nil {
actualSize += common.GetSizeOfMapStringToByteArray(startRequest.Memo.GetFields())
}
if err := common.CheckEventBlobSizeLimit(
actualSize,
sizeLimitWarn,
sizeLimitError,
domainID,
startRequest.GetWorkflowID(),
"",
scope,
wh.GetThrottledLogger(),
tag.BlobSizeViolationOperation("StartWorkflowExecution"),
); err != nil {
return err
}
isolationGroup := wh.getIsolationGroup(ctx, domainName)
if !wh.isIsolationGroupHealthy(ctx, domainName, isolationGroup) {
return &types.BadRequestError{fmt.Sprintf("Domain %s is drained from isolation group %s.", domainName, isolationGroup)}
}
return nil
}
// GetWorkflowExecutionHistory - retrieves the history of workflow execution
func (wh *WorkflowHandler) GetWorkflowExecutionHistory(
ctx context.Context,
getRequest *types.GetWorkflowExecutionHistoryRequest,
) (resp *types.GetWorkflowExecutionHistoryResponse, retError error) {
if wh.isShuttingDown() {
return nil, validate.ErrShuttingDown
}
if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
return nil, err
}
if getRequest == nil {
return nil, validate.ErrRequestNotSet
}
domainName := getRequest.GetDomain()
wfExecution := getRequest.GetExecution()
if domainName == "" {
return nil, validate.ErrDomainNotSet
}
if err := validate.CheckExecution(wfExecution); err != nil {
return nil, err
}
domainID, err := wh.GetDomainCache().GetDomainID(domainName)
if err != nil {
return nil, err
}
if getRequest.GetMaximumPageSize() <= 0 {
getRequest.MaximumPageSize = int32(wh.config.HistoryMaxPageSize(getRequest.GetDomain()))
}
// force limit page size if exceed
if getRequest.GetMaximumPageSize() > common.GetHistoryMaxPageSize {
wh.GetThrottledLogger().Warn("GetHistory page size is larger than threshold",
tag.WorkflowID(getRequest.Execution.GetWorkflowID()),
tag.WorkflowRunID(getRequest.Execution.GetRunID()),
tag.WorkflowDomainID(domainID),
tag.WorkflowSize(int64(getRequest.GetMaximumPageSize())))
getRequest.MaximumPageSize = common.GetHistoryMaxPageSize
}
scope := getMetricsScopeWithDomain(metrics.FrontendGetWorkflowExecutionHistoryScope, getRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
if !getRequest.GetSkipArchival() {
enableArchivalRead := wh.GetArchivalMetadata().GetHistoryConfig().ReadEnabled()
historyArchived := wh.historyArchived(ctx, getRequest, domainID)
if enableArchivalRead && historyArchived {
return wh.getArchivedHistory(ctx, getRequest, domainID)
}
}
// this function return the following 6 things,
// 1. branch token
// 2. the workflow run ID
// 3. the last first event ID (the event ID of the last batch of events in the history)
// 4. the next event ID
// 5. whether the workflow is closed
// 6. error if any
queryHistory := func(
domainUUID string,
execution *types.WorkflowExecution,
expectedNextEventID int64,
currentBranchToken []byte,
) ([]byte, string, int64, int64, bool, error) {
response, err := wh.GetHistoryClient().PollMutableState(ctx, &types.PollMutableStateRequest{
DomainUUID: domainUUID,
Execution: execution,
ExpectedNextEventID: expectedNextEventID,
CurrentBranchToken: currentBranchToken,
})
if err != nil {
return nil, "", 0, 0, false, err
}
isWorkflowRunning := response.GetWorkflowCloseState() == persistence.WorkflowCloseStatusNone
return response.CurrentBranchToken,
response.Execution.GetRunID(),
response.GetLastFirstEventID(),
response.GetNextEventID(),
isWorkflowRunning,
nil
}
isLongPoll := getRequest.GetWaitForNewEvent()
isCloseEventOnly := getRequest.GetHistoryEventFilterType() == types.HistoryEventFilterTypeCloseEvent
execution := getRequest.Execution
token := &getHistoryContinuationToken{}
var runID string
lastFirstEventID := common.FirstEventID
var nextEventID int64
var isWorkflowRunning bool
// process the token for paging
queryNextEventID := common.EndEventID
if getRequest.NextPageToken != nil {
token, err = deserializeHistoryToken(getRequest.NextPageToken)
if err != nil {
return nil, validate.ErrInvalidNextPageToken
}
if execution.RunID != "" && execution.GetRunID() != token.RunID {
return nil, validate.ErrNextPageTokenRunIDMismatch
}
execution.RunID = token.RunID
// we need to update the current next event ID and whether workflow is running
if len(token.PersistenceToken) == 0 && isLongPoll && token.IsWorkflowRunning {
logger := wh.GetLogger().WithTags(
tag.WorkflowDomainName(getRequest.GetDomain()),
tag.WorkflowID(getRequest.Execution.GetWorkflowID()),
tag.WorkflowRunID(getRequest.Execution.GetRunID()),
)
// TODO: for now we only log the invalid timeout (this is done inside the helper function) in case
// this change breaks existing customers. Once we are sure no one is calling this API with very short timeout
// we can return the error.
_ = common.ValidateLongPollContextTimeout(ctx, "GetWorkflowExecutionHistory", logger)
if !isCloseEventOnly {
queryNextEventID = token.NextEventID
}
token.BranchToken, _, lastFirstEventID, nextEventID, isWorkflowRunning, err =
queryHistory(domainID, execution, queryNextEventID, token.BranchToken)
if err != nil {
return nil, err
}
token.FirstEventID = token.NextEventID
token.NextEventID = nextEventID
token.IsWorkflowRunning = isWorkflowRunning
}
} else {
if !isCloseEventOnly {
queryNextEventID = common.FirstEventID
}
token.BranchToken, runID, lastFirstEventID, nextEventID, isWorkflowRunning, err =
queryHistory(domainID, execution, queryNextEventID, nil)
if err != nil {
return nil, err
}
execution.RunID = runID
token.RunID = runID
token.FirstEventID = common.FirstEventID
token.NextEventID = nextEventID
token.IsWorkflowRunning = isWorkflowRunning
token.PersistenceToken = nil
}
call := yarpc.CallFromContext(ctx)
clientFeatureVersion := call.Header(common.FeatureVersionHeaderName)
clientImpl := call.Header(common.ClientImplHeaderName)
supportsRawHistoryQuery := wh.versionChecker.SupportsRawHistoryQuery(clientImpl, clientFeatureVersion) == nil
isRawHistoryEnabled := wh.config.SendRawWorkflowHistory(domainName) && supportsRawHistoryQuery
history := &types.History{}
history.Events = []*types.HistoryEvent{}
var historyBlob []*types.DataBlob
// helper function to just getHistory
getHistory := func(firstEventID, nextEventID int64, nextPageToken []byte) error {
if isRawHistoryEnabled {
historyBlob, token.PersistenceToken, err = wh.getRawHistory(
ctx,
scope,
domainID,
domainName,
*execution,
firstEventID,
nextEventID,
getRequest.GetMaximumPageSize(),
nextPageToken,
token.TransientDecision,
token.BranchToken,
)
} else {
history, token.PersistenceToken, err = wh.getHistory(
ctx,
scope,
domainID,
domainName,
*execution,
firstEventID,
nextEventID,
getRequest.GetMaximumPageSize(),
nextPageToken,
token.TransientDecision,
token.BranchToken,
)
}
if err != nil {
return err
}
return nil
}
if isCloseEventOnly {
if !isWorkflowRunning {
if err := getHistory(lastFirstEventID, nextEventID, nil); err != nil {
return nil, err
}
if isRawHistoryEnabled {
// since getHistory func will not return empty history, so the below is safe
historyBlob = historyBlob[len(historyBlob)-1:]
} else {
// since getHistory func will not return empty history, so the below is safe
history.Events = history.Events[len(history.Events)-1:]
}
token = nil
} else if isLongPoll {
// set the persistence token to be nil so next time we will query history for updates
token.PersistenceToken = nil
} else {
token = nil
}
} else {
// return all events
if token.FirstEventID >= token.NextEventID {
// currently there is no new event
history.Events = []*types.HistoryEvent{}
if !isWorkflowRunning {
token = nil
}
} else {
if err := getHistory(token.FirstEventID, token.NextEventID, token.PersistenceToken); err != nil {
return nil, err
}
// here, for long pull on history events, we need to intercept the paging token from cassandra
// and do something clever
if len(token.PersistenceToken) == 0 && (!token.IsWorkflowRunning || !isLongPoll) {
// meaning, there is no more history to be returned
token = nil
}
}
}
nextToken, err := serializeHistoryToken(token)
if err != nil {
return nil, err
}
return &types.GetWorkflowExecutionHistoryResponse{
History: history,
RawHistory: historyBlob,
NextPageToken: nextToken,
Archived: false,
}, nil
}
// SignalWorkflowExecution is used to send a signal event to running workflow execution. This results in
// WorkflowExecutionSignaled event recorded in the history and a decision task being created for the execution.
func (wh *WorkflowHandler) SignalWorkflowExecution(
ctx context.Context,
signalRequest *types.SignalWorkflowExecutionRequest,
) (retError error) {
if wh.isShuttingDown() {
return validate.ErrShuttingDown
}
if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
return err
}
if signalRequest == nil {
return validate.ErrRequestNotSet
}
domainName := signalRequest.GetDomain()
wfExecution := signalRequest.GetWorkflowExecution()
if domainName == "" {
return validate.ErrDomainNotSet
}
if err := validate.CheckExecution(wfExecution); err != nil {
return err
}
scope := getMetricsScopeWithDomain(metrics.FrontendSignalWorkflowExecutionScope, signalRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
idLengthWarnLimit := wh.config.MaxIDLengthWarnLimit()
if !common.IsValidIDLength(
domainName,
scope,
idLengthWarnLimit,
wh.config.DomainNameMaxLength(domainName),
metrics.CadenceErrDomainNameExceededWarnLimit,
domainName,
wh.GetLogger(),
tag.IDTypeDomainName) {
return validate.ErrDomainTooLong
}
if signalRequest.GetSignalName() == "" {
return validate.ErrSignalNameNotSet
}
if !common.IsValidIDLength(
signalRequest.GetSignalName(),
scope,
idLengthWarnLimit,
wh.config.SignalNameMaxLength(domainName),
metrics.CadenceErrSignalNameExceededWarnLimit,
domainName,
wh.GetLogger(),
tag.IDTypeSignalName) {
return validate.ErrSignalNameTooLong
}
if !common.IsValidIDLength(
signalRequest.GetRequestID(),
scope,
idLengthWarnLimit,
wh.config.RequestIDMaxLength(domainName),
metrics.CadenceErrRequestIDExceededWarnLimit,
domainName,
wh.GetLogger(),
tag.IDTypeRequestID) {
return validate.ErrRequestIDTooLong
}
domainID, err := wh.GetDomainCache().GetDomainID(domainName)
if err != nil {
return err
}
sizeLimitError := wh.config.BlobSizeLimitError(domainName)
sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
if err := common.CheckEventBlobSizeLimit(
len(signalRequest.Input),
sizeLimitWarn,
sizeLimitError,
domainID,
signalRequest.GetWorkflowExecution().GetWorkflowID(),
signalRequest.GetWorkflowExecution().GetRunID(),
scope,
wh.GetThrottledLogger(),
tag.BlobSizeViolationOperation("SignalWorkflowExecution"),
); err != nil {
return err
}
isolationGroup := wh.getIsolationGroup(ctx, domainName)
if !wh.isIsolationGroupHealthy(ctx, domainName, isolationGroup) {
return &types.BadRequestError{fmt.Sprintf("Domain %s is drained from isolation group %s.", domainName, isolationGroup)}
}
err = wh.GetHistoryClient().SignalWorkflowExecution(ctx, &types.HistorySignalWorkflowExecutionRequest{
DomainUUID: domainID,
SignalRequest: signalRequest,
})
if err != nil {
return wh.normalizeVersionedErrors(ctx, err)
}
return nil
}
func (wh *WorkflowHandler) SignalWithStartWorkflowExecutionAsync(
ctx context.Context,
signalWithStartRequest *types.SignalWithStartWorkflowExecutionAsyncRequest,
) (resp *types.SignalWithStartWorkflowExecutionAsyncResponse, retError error) {
if wh.isShuttingDown() {
return nil, validate.ErrShuttingDown
}
scope := getMetricsScopeWithDomain(metrics.FrontendSignalWithStartWorkflowExecutionAsyncScope, signalWithStartRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
// validate request before pushing to queue
err := wh.validateSignalWithStartWorkflowExecutionRequest(ctx, signalWithStartRequest.SignalWithStartWorkflowExecutionRequest, scope)
if err != nil {
return nil, err
}
producer, err := wh.producerManager.GetProducerByDomain(signalWithStartRequest.GetDomain())
if err != nil {
return nil, err
}
// serialize the message to be sent to the queue
payload, err := json.Marshal(signalWithStartRequest)
if err != nil {
return nil, err
}
// propagate the headers from the context to the message
clientHeaders := common.GetClientHeaders(ctx)
header := &shared.Header{
Fields: map[string][]byte{},
}
for k, v := range clientHeaders {
header.Fields[k] = []byte(v)
}
messageType := sqlblobs.AsyncRequestTypeSignalWithStartWorkflowExecutionAsyncRequest
message := &sqlblobs.AsyncRequestMessage{
PartitionKey: common.StringPtr(signalWithStartRequest.GetWorkflowID()),
Type: &messageType,
Header: header,
Encoding: common.StringPtr(string(common.EncodingTypeJSON)),
Payload: payload,
}
err = producer.Publish(ctx, message)
if err != nil {
return nil, err
}
return &types.SignalWithStartWorkflowExecutionAsyncResponse{}, nil
}
// SignalWithStartWorkflowExecution is used to ensure sending a signal event to a workflow execution.
// If workflow is running, this results in WorkflowExecutionSignaled event recorded in the history
// and a decision task being created for the execution.
// If workflow is not running or not found, this results in WorkflowExecutionStarted and WorkflowExecutionSignaled
// event recorded in history, and a decision task being created for the execution
func (wh *WorkflowHandler) SignalWithStartWorkflowExecution(
ctx context.Context,
signalWithStartRequest *types.SignalWithStartWorkflowExecutionRequest,
) (resp *types.StartWorkflowExecutionResponse, retError error) {
if wh.isShuttingDown() {
return nil, validate.ErrShuttingDown
}
scope := getMetricsScopeWithDomain(metrics.FrontendSignalWithStartWorkflowExecutionScope, signalWithStartRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
err := wh.validateSignalWithStartWorkflowExecutionRequest(ctx, signalWithStartRequest, scope)
if err != nil {
return nil, err
}
domainName := signalWithStartRequest.GetDomain()
domainID, err := wh.GetDomainCache().GetDomainID(domainName)
if err != nil {
return nil, err
}
resp, err = wh.GetHistoryClient().SignalWithStartWorkflowExecution(ctx, &types.HistorySignalWithStartWorkflowExecutionRequest{
DomainUUID: domainID,
SignalWithStartRequest: signalWithStartRequest,
PartitionConfig: wh.getPartitionConfig(ctx, domainName),
})
if err != nil {
return nil, err
}
return resp, nil
}
func (wh *WorkflowHandler) validateSignalWithStartWorkflowExecutionRequest(ctx context.Context, signalWithStartRequest *types.SignalWithStartWorkflowExecutionRequest, scope metrics.Scope) error {
if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
return err
}
if signalWithStartRequest == nil {
return validate.ErrRequestNotSet
}
domainName := signalWithStartRequest.GetDomain()
if domainName == "" {
return validate.ErrDomainNotSet
}
if signalWithStartRequest.GetWorkflowID() == "" {
return validate.ErrWorkflowIDNotSet
}
idLengthWarnLimit := wh.config.MaxIDLengthWarnLimit()
if !common.IsValidIDLength(
domainName,
scope,
idLengthWarnLimit,
wh.config.DomainNameMaxLength(domainName),
metrics.CadenceErrDomainNameExceededWarnLimit,
domainName,
wh.GetLogger(),
tag.IDTypeDomainName) {
return validate.ErrDomainTooLong
}
if !common.IsValidIDLength(
signalWithStartRequest.GetWorkflowID(),
scope,
idLengthWarnLimit,
wh.config.WorkflowIDMaxLength(domainName),
metrics.CadenceErrWorkflowIDExceededWarnLimit,
domainName,
wh.GetLogger(),
tag.IDTypeWorkflowID) {
return validate.ErrWorkflowIDTooLong
}
if signalWithStartRequest.GetSignalName() == "" {
return validate.ErrSignalNameNotSet
}
if !common.IsValidIDLength(
signalWithStartRequest.GetSignalName(),
scope,
idLengthWarnLimit,
wh.config.SignalNameMaxLength(domainName),
metrics.CadenceErrSignalNameExceededWarnLimit,
domainName,
wh.GetLogger(),
tag.IDTypeSignalName) {
return validate.ErrSignalNameTooLong
}
if signalWithStartRequest.WorkflowType == nil || signalWithStartRequest.WorkflowType.GetName() == "" {
return validate.ErrWorkflowTypeNotSet
}
if !common.IsValidIDLength(
signalWithStartRequest.WorkflowType.GetName(),
scope,
idLengthWarnLimit,
wh.config.WorkflowTypeMaxLength(domainName),
metrics.CadenceErrWorkflowTypeExceededWarnLimit,
domainName,
wh.GetLogger(),
tag.IDTypeWorkflowType) {
return validate.ErrWorkflowTypeTooLong
}
if err := wh.validateTaskList(signalWithStartRequest.TaskList, scope, domainName); err != nil {
return err
}
if !common.IsValidIDLength(
signalWithStartRequest.GetRequestID(),
scope,
idLengthWarnLimit,
wh.config.RequestIDMaxLength(domainName),
metrics.CadenceErrRequestIDExceededWarnLimit,
domainName,
wh.GetLogger(),
tag.IDTypeRequestID) {
return validate.ErrRequestIDTooLong
}
if signalWithStartRequest.GetExecutionStartToCloseTimeoutSeconds() <= 0 {
return validate.ErrInvalidExecutionStartToCloseTimeoutSeconds
}
if signalWithStartRequest.GetTaskStartToCloseTimeoutSeconds() <= 0 {
return validate.ErrInvalidTaskStartToCloseTimeoutSeconds
}
if err := common.ValidateRetryPolicy(signalWithStartRequest.RetryPolicy); err != nil {
return err
}
if signalWithStartRequest.GetCronSchedule() != "" {
if _, err := backoff.ValidateSchedule(signalWithStartRequest.GetCronSchedule()); err != nil {
return err
}
}
if err := wh.searchAttributesValidator.ValidateSearchAttributes(signalWithStartRequest.SearchAttributes, domainName); err != nil {
return err
}
domainID, err := wh.GetDomainCache().GetDomainID(domainName)
if err != nil {
return err
}
sizeLimitError := wh.config.BlobSizeLimitError(domainName)
sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
if err := common.CheckEventBlobSizeLimit(
len(signalWithStartRequest.SignalInput),
sizeLimitWarn,
sizeLimitError,
domainID,
signalWithStartRequest.GetWorkflowID(),
"",
scope,
wh.GetThrottledLogger(),
tag.BlobSizeViolationOperation("SignalWithStartWorkflowExecution"),
); err != nil {
return err
}
actualSize := len(signalWithStartRequest.Input) + common.GetSizeOfMapStringToByteArray(signalWithStartRequest.Memo.GetFields())
if err := common.CheckEventBlobSizeLimit(
actualSize,
sizeLimitWarn,
sizeLimitError,
domainID,
signalWithStartRequest.GetWorkflowID(),
"",
scope,
wh.GetThrottledLogger(),
tag.BlobSizeViolationOperation("SignalWithStartWorkflowExecution"),
); err != nil {
return err
}
isolationGroup := wh.getIsolationGroup(ctx, domainName)
if !wh.isIsolationGroupHealthy(ctx, domainName, isolationGroup) {
return &types.BadRequestError{fmt.Sprintf("Domain %s is drained from isolation group %s.", domainName, isolationGroup)}
}
return nil
}
// TerminateWorkflowExecution terminates an existing workflow execution by recording WorkflowExecutionTerminated event
// in the history and immediately terminating the execution instance.
func (wh *WorkflowHandler) TerminateWorkflowExecution(
ctx context.Context,
terminateRequest *types.TerminateWorkflowExecutionRequest,
) (retError error) {
if wh.isShuttingDown() {
return validate.ErrShuttingDown
}
if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
return err
}
if terminateRequest == nil {
return validate.ErrRequestNotSet
}
domainName := terminateRequest.GetDomain()
wfExecution := terminateRequest.GetWorkflowExecution()
if terminateRequest.GetDomain() == "" {
return validate.ErrDomainNotSet
}
if err := validate.CheckExecution(wfExecution); err != nil {
return err
}
domainID, err := wh.GetDomainCache().GetDomainID(domainName)
if err != nil {
return err
}
err = wh.GetHistoryClient().TerminateWorkflowExecution(ctx, &types.HistoryTerminateWorkflowExecutionRequest{
DomainUUID: domainID,
TerminateRequest: terminateRequest,
})
if err != nil {
return wh.normalizeVersionedErrors(ctx, err)
}
return nil
}
// ResetWorkflowExecution reset an existing workflow execution to the nextFirstEventID
// in the history and immediately terminating the current execution instance.
func (wh *WorkflowHandler) ResetWorkflowExecution(
ctx context.Context,
resetRequest *types.ResetWorkflowExecutionRequest,
) (resp *types.ResetWorkflowExecutionResponse, retError error) {
if wh.isShuttingDown() {
return nil, validate.ErrShuttingDown
}
if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
return nil, err
}
if resetRequest == nil {
return nil, validate.ErrRequestNotSet
}
domainName := resetRequest.GetDomain()
wfExecution := resetRequest.GetWorkflowExecution()
if domainName == "" {
return nil, validate.ErrDomainNotSet
}
if err := validate.CheckExecution(wfExecution); err != nil {
return nil, err
}
domainID, err := wh.GetDomainCache().GetDomainID(resetRequest.GetDomain())
if err != nil {
return nil, err
}
resp, err = wh.GetHistoryClient().ResetWorkflowExecution(ctx, &types.HistoryResetWorkflowExecutionRequest{
DomainUUID: domainID,
ResetRequest: resetRequest,
})
if err != nil {
return nil, err
}
return resp, nil
}
// RequestCancelWorkflowExecution - requests to cancel a workflow execution
func (wh *WorkflowHandler) RequestCancelWorkflowExecution(
ctx context.Context,
cancelRequest *types.RequestCancelWorkflowExecutionRequest,
) (retError error) {
if wh.isShuttingDown() {
return validate.ErrShuttingDown
}
if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
return err
}
if cancelRequest == nil {
return validate.ErrRequestNotSet
}
domainName := cancelRequest.GetDomain()
wfExecution := cancelRequest.GetWorkflowExecution()
if domainName == "" {
return validate.ErrDomainNotSet
}
if err := validate.CheckExecution(wfExecution); err != nil {
return err
}
domainID, err := wh.GetDomainCache().GetDomainID(cancelRequest.GetDomain())
if err != nil {
return err
}
err = wh.GetHistoryClient().RequestCancelWorkflowExecution(ctx, &types.HistoryRequestCancelWorkflowExecutionRequest{
DomainUUID: domainID,
CancelRequest: cancelRequest,
})
if err != nil {
return wh.normalizeVersionedErrors(ctx, err)
}
return nil
}
// ListOpenWorkflowExecutions - retrieves info for open workflow executions in a domain
func (wh *WorkflowHandler) ListOpenWorkflowExecutions(
ctx context.Context,
listRequest *types.ListOpenWorkflowExecutionsRequest,
) (resp *types.ListOpenWorkflowExecutionsResponse, retError error) {
if wh.isShuttingDown() {
return nil, validate.ErrShuttingDown
}
if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
return nil, err
}
if listRequest == nil {
return nil, validate.ErrRequestNotSet
}
if listRequest.GetDomain() == "" {
return nil, validate.ErrDomainNotSet
}
if listRequest.StartTimeFilter == nil {
return nil, &types.BadRequestError{Message: "StartTimeFilter is required"}
}
if listRequest.StartTimeFilter.EarliestTime == nil {
return nil, &types.BadRequestError{Message: "EarliestTime in StartTimeFilter is required"}
}
if listRequest.StartTimeFilter.LatestTime == nil {
return nil, &types.BadRequestError{Message: "LatestTime in StartTimeFilter is required"}
}
if listRequest.StartTimeFilter.GetEarliestTime() > listRequest.StartTimeFilter.GetLatestTime() {
return nil, &types.BadRequestError{Message: "EarliestTime in StartTimeFilter should not be larger than LatestTime"}
}
if listRequest.ExecutionFilter != nil && listRequest.TypeFilter != nil {
return nil, &types.BadRequestError{
Message: "Only one of ExecutionFilter or TypeFilter is allowed"}
}
if listRequest.GetMaximumPageSize() <= 0 {
listRequest.MaximumPageSize = int32(wh.config.VisibilityMaxPageSize(listRequest.GetDomain()))
}
if wh.isListRequestPageSizeTooLarge(listRequest.GetMaximumPageSize(), listRequest.GetDomain()) {
return nil, &types.BadRequestError{
Message: fmt.Sprintf("Pagesize is larger than allow %d", wh.config.ESIndexMaxResultWindow())}
}
domain := listRequest.GetDomain()
domainID, err := wh.GetDomainCache().GetDomainID(domain)
if err != nil {
return nil, err
}
baseReq := persistence.ListWorkflowExecutionsRequest{
DomainUUID: domainID,
Domain: domain,
PageSize: int(listRequest.GetMaximumPageSize()),
NextPageToken: listRequest.NextPageToken,
EarliestTime: listRequest.StartTimeFilter.GetEarliestTime(),
LatestTime: listRequest.StartTimeFilter.GetLatestTime(),
}
var persistenceResp *persistence.ListWorkflowExecutionsResponse
if listRequest.ExecutionFilter != nil {
if wh.config.DisableListVisibilityByFilter(domain) {
err = validate.ErrNoPermission
} else {
persistenceResp, err = wh.GetVisibilityManager().ListOpenWorkflowExecutionsByWorkflowID(
ctx,
&persistence.ListWorkflowExecutionsByWorkflowIDRequest{
ListWorkflowExecutionsRequest: baseReq,
WorkflowID: listRequest.ExecutionFilter.GetWorkflowID(),
})
}
wh.GetLogger().Debug("List open workflow with filter",
tag.WorkflowDomainName(listRequest.GetDomain()), tag.WorkflowListWorkflowFilterByID)
} else if listRequest.TypeFilter != nil {
if wh.config.DisableListVisibilityByFilter(domain) {
err = validate.ErrNoPermission
} else {
persistenceResp, err = wh.GetVisibilityManager().ListOpenWorkflowExecutionsByType(
ctx,
&persistence.ListWorkflowExecutionsByTypeRequest{
ListWorkflowExecutionsRequest: baseReq,
WorkflowTypeName: listRequest.TypeFilter.GetName(),
},
)
}
wh.GetLogger().Debug("List open workflow with filter",
tag.WorkflowDomainName(listRequest.GetDomain()), tag.WorkflowListWorkflowFilterByType)
} else {
persistenceResp, err = wh.GetVisibilityManager().ListOpenWorkflowExecutions(ctx, &baseReq)
}
if err != nil {
return nil, err
}
resp = &types.ListOpenWorkflowExecutionsResponse{}
resp.Executions = persistenceResp.Executions
resp.NextPageToken = persistenceResp.NextPageToken
return resp, nil
}
// ListArchivedWorkflowExecutions - retrieves archived info for closed workflow executions in a domain
func (wh *WorkflowHandler) ListArchivedWorkflowExecutions(
ctx context.Context,
listRequest *types.ListArchivedWorkflowExecutionsRequest,
) (resp *types.ListArchivedWorkflowExecutionsResponse, retError error) {
if wh.isShuttingDown() {
return nil, validate.ErrShuttingDown
}
if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
return nil, err
}
if listRequest == nil {
return nil, validate.ErrRequestNotSet
}
if listRequest.GetDomain() == "" {
return nil, validate.ErrDomainNotSet
}
if listRequest.GetPageSize() <= 0 {
listRequest.PageSize = int32(wh.config.VisibilityMaxPageSize(listRequest.GetDomain()))
}
maxPageSize := wh.config.VisibilityArchivalQueryMaxPageSize()
if int(listRequest.GetPageSize()) > maxPageSize {
return nil, &types.BadRequestError{
Message: fmt.Sprintf("Pagesize is larger than allowed %d", maxPageSize)}
}
if !wh.GetArchivalMetadata().GetVisibilityConfig().ClusterConfiguredForArchival() {
return nil, &types.BadRequestError{Message: "Cluster is not configured for visibility archival"}
}
if !wh.GetArchivalMetadata().GetVisibilityConfig().ReadEnabled() {
return nil, &types.BadRequestError{Message: "Cluster is not configured for reading archived visibility records"}
}
entry, err := wh.GetDomainCache().GetDomain(listRequest.GetDomain())
if err != nil {
return nil, err
}
if entry.GetConfig().VisibilityArchivalStatus != types.ArchivalStatusEnabled {
return nil, &types.BadRequestError{Message: "Domain is not configured for visibility archival"}
}
URI, err := archiver.NewURI(entry.GetConfig().VisibilityArchivalURI)
if err != nil {
return nil, err
}
visibilityArchiver, err := wh.GetArchiverProvider().GetVisibilityArchiver(URI.Scheme(), service.Frontend)
if err != nil {
return nil, err
}
archiverRequest := &archiver.QueryVisibilityRequest{
DomainID: entry.GetInfo().ID,
PageSize: int(listRequest.GetPageSize()),
NextPageToken: listRequest.NextPageToken,
Query: listRequest.GetQuery(),
}
archiverResponse, err := visibilityArchiver.Query(ctx, URI, archiverRequest)
if err != nil {
return nil, err
}
// special handling of ExecutionTime for cron or retry
for _, execution := range archiverResponse.Executions {
if execution.GetExecutionTime() == 0 {
execution.ExecutionTime = common.Int64Ptr(execution.GetStartTime())
}
}
return &types.ListArchivedWorkflowExecutionsResponse{
Executions: archiverResponse.Executions,
NextPageToken: archiverResponse.NextPageToken,
}, nil
}
// ListClosedWorkflowExecutions - retrieves info for closed workflow executions in a domain
func (wh *WorkflowHandler) ListClosedWorkflowExecutions(
ctx context.Context,
listRequest *types.ListClosedWorkflowExecutionsRequest,
) (resp *types.ListClosedWorkflowExecutionsResponse, retError error) {
if wh.isShuttingDown() {
return nil, validate.ErrShuttingDown
}
if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
return nil, err
}
if listRequest == nil {
return nil, validate.ErrRequestNotSet
}
if listRequest.GetDomain() == "" {
return nil, validate.ErrDomainNotSet
}
if listRequest.StartTimeFilter == nil {
return nil, &types.BadRequestError{Message: "StartTimeFilter is required"}
}
if listRequest.StartTimeFilter.EarliestTime == nil {
return nil, &types.BadRequestError{Message: "EarliestTime in StartTimeFilter is required"}
}
if listRequest.StartTimeFilter.LatestTime == nil {
return nil, &types.BadRequestError{Message: "LatestTime in StartTimeFilter is required"}
}
if listRequest.StartTimeFilter.GetEarliestTime() > listRequest.StartTimeFilter.GetLatestTime() {
return nil, &types.BadRequestError{Message: "EarliestTime in StartTimeFilter should not be larger than LatestTime"}
}
filterCount := 0
if listRequest.TypeFilter != nil {
filterCount++
}
if listRequest.StatusFilter != nil {
filterCount++
}
if filterCount > 1 {
return nil, &types.BadRequestError{
Message: "Only one of ExecutionFilter, TypeFilter or StatusFilter is allowed"}
} // If ExecutionFilter is provided with one of TypeFilter or StatusFilter, use ExecutionFilter and ignore other filter
if listRequest.GetMaximumPageSize() <= 0 {
listRequest.MaximumPageSize = int32(wh.config.VisibilityMaxPageSize(listRequest.GetDomain()))
}
if wh.isListRequestPageSizeTooLarge(listRequest.GetMaximumPageSize(), listRequest.GetDomain()) {
return nil, &types.BadRequestError{
Message: fmt.Sprintf("Pagesize is larger than allow %d", wh.config.ESIndexMaxResultWindow())}
}
domain := listRequest.GetDomain()
domainID, err := wh.GetDomainCache().GetDomainID(domain)
if err != nil {
return nil, err
}
baseReq := persistence.ListWorkflowExecutionsRequest{
DomainUUID: domainID,
Domain: domain,
PageSize: int(listRequest.GetMaximumPageSize()),
NextPageToken: listRequest.NextPageToken,
EarliestTime: listRequest.StartTimeFilter.GetEarliestTime(),
LatestTime: listRequest.StartTimeFilter.GetLatestTime(),
}
var persistenceResp *persistence.ListWorkflowExecutionsResponse
if listRequest.ExecutionFilter != nil {
if wh.config.DisableListVisibilityByFilter(domain) {
err = validate.ErrNoPermission
} else {
persistenceResp, err = wh.GetVisibilityManager().ListClosedWorkflowExecutionsByWorkflowID(
ctx,
&persistence.ListWorkflowExecutionsByWorkflowIDRequest{
ListWorkflowExecutionsRequest: baseReq,
WorkflowID: listRequest.ExecutionFilter.GetWorkflowID(),
},
)
}
wh.GetLogger().Debug("List closed workflow with filter",
tag.WorkflowDomainName(listRequest.GetDomain()), tag.WorkflowListWorkflowFilterByID)
} else if listRequest.TypeFilter != nil {
if wh.config.DisableListVisibilityByFilter(domain) {
err = validate.ErrNoPermission
} else {
persistenceResp, err = wh.GetVisibilityManager().ListClosedWorkflowExecutionsByType(
ctx,
&persistence.ListWorkflowExecutionsByTypeRequest{
ListWorkflowExecutionsRequest: baseReq,
WorkflowTypeName: listRequest.TypeFilter.GetName(),
},
)
}
wh.GetLogger().Debug("List closed workflow with filter",
tag.WorkflowDomainName(listRequest.GetDomain()), tag.WorkflowListWorkflowFilterByType)
} else if listRequest.StatusFilter != nil {
if wh.config.DisableListVisibilityByFilter(domain) {
err = validate.ErrNoPermission
} else {
persistenceResp, err = wh.GetVisibilityManager().ListClosedWorkflowExecutionsByStatus(
ctx,
&persistence.ListClosedWorkflowExecutionsByStatusRequest{
ListWorkflowExecutionsRequest: baseReq,
Status: listRequest.GetStatusFilter(),
},
)
}
wh.GetLogger().Debug("List closed workflow with filter",
tag.WorkflowDomainName(listRequest.GetDomain()), tag.WorkflowListWorkflowFilterByStatus)
} else {
persistenceResp, err = wh.GetVisibilityManager().ListClosedWorkflowExecutions(ctx, &baseReq)
}
if err != nil {
return nil, err
}
resp = &types.ListClosedWorkflowExecutionsResponse{}
resp.Executions = persistenceResp.Executions
resp.NextPageToken = persistenceResp.NextPageToken
return resp, nil
}
// ListWorkflowExecutions - retrieves info for workflow executions in a domain
func (wh *WorkflowHandler) ListWorkflowExecutions(
ctx context.Context,
listRequest *types.ListWorkflowExecutionsRequest,
) (resp *types.ListWorkflowExecutionsResponse, retError error) {
if wh.isShuttingDown() {
return nil, validate.ErrShuttingDown
}
if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
return nil, err
}
if listRequest == nil {
return nil, validate.ErrRequestNotSet
}
if listRequest.GetDomain() == "" {
return nil, validate.ErrDomainNotSet
}
if listRequest.GetPageSize() <= 0 {
listRequest.PageSize = int32(wh.config.VisibilityMaxPageSize(listRequest.GetDomain()))
}
if wh.isListRequestPageSizeTooLarge(listRequest.GetPageSize(), listRequest.GetDomain()) {
return nil, &types.BadRequestError{
Message: fmt.Sprintf("Pagesize is larger than allow %d", wh.config.ESIndexMaxResultWindow())}
}
validatedQuery, err := wh.visibilityQueryValidator.ValidateQuery(listRequest.GetQuery())
if err != nil {
return nil, err
}
domain := listRequest.GetDomain()
domainID, err := wh.GetDomainCache().GetDomainID(domain)
if err != nil {
return nil, err
}
req := &persistence.ListWorkflowExecutionsByQueryRequest{
DomainUUID: domainID,
Domain: domain,
PageSize: int(listRequest.GetPageSize()),
NextPageToken: listRequest.NextPageToken,
Query: validatedQuery,
}
persistenceResp, err := wh.GetVisibilityManager().ListWorkflowExecutions(ctx, req)
if err != nil {
return nil, err
}
resp = &types.ListWorkflowExecutionsResponse{}
resp.Executions = persistenceResp.Executions
resp.NextPageToken = persistenceResp.NextPageToken
return resp, nil
}
// RestartWorkflowExecution - retrieves info for an existing workflow then restarts it
func (wh *WorkflowHandler) RestartWorkflowExecution(ctx context.Context, request *types.RestartWorkflowExecutionRequest) (resp *types.RestartWorkflowExecutionResponse, retError error) {
if wh.isShuttingDown() {
return nil, validate.ErrShuttingDown
}
if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
return nil, err
}
if request == nil {
return nil, validate.ErrRequestNotSet
}
domainName := request.GetDomain()
wfExecution := request.GetWorkflowExecution()
if request.GetDomain() == "" {
return nil, validate.ErrDomainNotSet
}
if err := validate.CheckExecution(wfExecution); err != nil {
return nil, err
}
domainID, err := wh.GetDomainCache().GetDomainID(domainName)
if err != nil {
return nil, err
}
isolationGroup := wh.getIsolationGroup(ctx, domainName)
if !wh.isIsolationGroupHealthy(ctx, domainName, isolationGroup) {
return nil, &types.BadRequestError{fmt.Sprintf("Domain %s is drained from isolation group %s.", domainName, isolationGroup)}
}
history, err := wh.GetWorkflowExecutionHistory(ctx, &types.GetWorkflowExecutionHistoryRequest{
Domain: domainName,
Execution: &types.WorkflowExecution{
WorkflowID: wfExecution.WorkflowID,
RunID: wfExecution.RunID,
},
SkipArchival: true,
})
if err != nil {
return nil, validate.ErrHistoryNotFound
}
startRequest := constructRestartWorkflowRequest(history.History.Events[0].WorkflowExecutionStartedEventAttributes,
domainName, request.Identity, wfExecution.WorkflowID)
req, err := common.CreateHistoryStartWorkflowRequest(domainID, startRequest, time.Now(), wh.getPartitionConfig(ctx, domainName))
if err != nil {
return nil, err
}
startResp, err := wh.GetHistoryClient().StartWorkflowExecution(ctx, req)
if err != nil {
return nil, wh.normalizeVersionedErrors(ctx, err)
}
resp = &types.RestartWorkflowExecutionResponse{
RunID: startResp.RunID,
}
return resp, nil
}
// ScanWorkflowExecutions - retrieves info for large amount of workflow executions in a domain without order
func (wh *WorkflowHandler) ScanWorkflowExecutions(
ctx context.Context,
listRequest *types.ListWorkflowExecutionsRequest,
) (resp *types.ListWorkflowExecutionsResponse, retError error) {
if wh.isShuttingDown() {
return nil, validate.ErrShuttingDown
}
if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
return nil, err
}
if listRequest == nil {
return nil, validate.ErrRequestNotSet
}
if listRequest.GetDomain() == "" {
return nil, validate.ErrDomainNotSet
}
if listRequest.GetPageSize() <= 0 {
listRequest.PageSize = int32(wh.config.VisibilityMaxPageSize(listRequest.GetDomain()))
}
if wh.isListRequestPageSizeTooLarge(listRequest.GetPageSize(), listRequest.GetDomain()) {
return nil, &types.BadRequestError{
Message: fmt.Sprintf("Pagesize is larger than allow %d", wh.config.ESIndexMaxResultWindow())}
}
validatedQuery, err := wh.visibilityQueryValidator.ValidateQuery(listRequest.GetQuery())
if err != nil {
return nil, err
}
domain := listRequest.GetDomain()
domainID, err := wh.GetDomainCache().GetDomainID(domain)
if err != nil {
return nil, err
}
req := &persistence.ListWorkflowExecutionsByQueryRequest{
DomainUUID: domainID,
Domain: domain,
PageSize: int(listRequest.GetPageSize()),
NextPageToken: listRequest.NextPageToken,
Query: validatedQuery,
}
persistenceResp, err := wh.GetVisibilityManager().ScanWorkflowExecutions(ctx, req)
if err != nil {
return nil, err
}
resp = &types.ListWorkflowExecutionsResponse{}
resp.Executions = persistenceResp.Executions
resp.NextPageToken = persistenceResp.NextPageToken
return resp, nil
}
// CountWorkflowExecutions - count number of workflow executions in a domain
func (wh *WorkflowHandler) CountWorkflowExecutions(
ctx context.Context,
countRequest *types.CountWorkflowExecutionsRequest,
) (resp *types.CountWorkflowExecutionsResponse, retError error) {
if wh.isShuttingDown() {
return nil, validate.ErrShuttingDown
}
if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
return nil, err
}
if countRequest == nil {
return nil, validate.ErrRequestNotSet
}
if countRequest.GetDomain() == "" {
return nil, validate.ErrDomainNotSet
}
validatedQuery, err := wh.visibilityQueryValidator.ValidateQuery(countRequest.GetQuery())
if err != nil {
return nil, err
}
domain := countRequest.GetDomain()
domainID, err := wh.GetDomainCache().GetDomainID(domain)
if err != nil {
return nil, err
}
req := &persistence.CountWorkflowExecutionsRequest{
DomainUUID: domainID,
Domain: domain,
Query: validatedQuery,
}
persistenceResp, err := wh.GetVisibilityManager().CountWorkflowExecutions(ctx, req)
if err != nil {
return nil, err
}
resp = &types.CountWorkflowExecutionsResponse{
Count: persistenceResp.Count,
}
return resp, nil
}
// GetSearchAttributes return valid indexed keys
func (wh *WorkflowHandler) GetSearchAttributes(ctx context.Context) (resp *types.GetSearchAttributesResponse, retError error) {
if wh.isShuttingDown() {
return nil, validate.ErrShuttingDown
}
if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
return nil, err
}
keys := wh.config.ValidSearchAttributes()
resp = &types.GetSearchAttributesResponse{
Keys: wh.convertIndexedKeyToThrift(keys),
}
return resp, nil
}
// ResetStickyTaskList reset the volatile information in mutable state of a given workflow.
func (wh *WorkflowHandler) ResetStickyTaskList(
ctx context.Context,
resetRequest *types.ResetStickyTaskListRequest,
) (resp *types.ResetStickyTaskListResponse, retError error) {
if wh.isShuttingDown() {
return nil, validate.ErrShuttingDown
}
if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
return nil, err
}
if resetRequest == nil {
return nil, validate.ErrRequestNotSet
}
domainName := resetRequest.GetDomain()
wfExecution := resetRequest.GetExecution()
if domainName == "" {
return nil, validate.ErrDomainNotSet
}
if err := validate.CheckExecution(wfExecution); err != nil {
return nil, err
}
domainID, err := wh.GetDomainCache().GetDomainID(resetRequest.GetDomain())
if err != nil {
return nil, err
}
_, err = wh.GetHistoryClient().ResetStickyTaskList(ctx, &types.HistoryResetStickyTaskListRequest{
DomainUUID: domainID,
Execution: resetRequest.Execution,
})
if err != nil {
return nil, wh.normalizeVersionedErrors(ctx, err)
}
return &types.ResetStickyTaskListResponse{}, nil
}
// QueryWorkflow returns query result for a specified workflow execution
func (wh *WorkflowHandler) QueryWorkflow(
ctx context.Context,
queryRequest *types.QueryWorkflowRequest,
) (resp *types.QueryWorkflowResponse, retError error) {
if wh.isShuttingDown() {
return nil, validate.ErrShuttingDown
}
if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
return nil, err
}
if queryRequest == nil {
return nil, validate.ErrRequestNotSet
}
domainName := queryRequest.GetDomain()
wfExecution := queryRequest.GetExecution()
if domainName == "" {
return nil, validate.ErrDomainNotSet
}
if err := validate.CheckExecution(wfExecution); err != nil {
return nil, err
}
if wh.config.DisallowQuery(domainName) {
return nil, validate.ErrQueryDisallowedForDomain
}
if queryRequest.Query == nil {
return nil, validate.ErrQueryNotSet
}
if queryRequest.Query.GetQueryType() == "" {
return nil, validate.ErrQueryTypeNotSet
}
domainID, err := wh.GetDomainCache().GetDomainID(domainName)
if err != nil {
return nil, err
}
sizeLimitError := wh.config.BlobSizeLimitError(domainName)
sizeLimitWarn := wh.config.BlobSizeLimitWarn(domainName)
scope := getMetricsScopeWithDomain(metrics.FrontendQueryWorkflowScope, queryRequest, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
if err := common.CheckEventBlobSizeLimit(
len(queryRequest.GetQuery().GetQueryArgs()),
sizeLimitWarn,
sizeLimitError,
domainID,
queryRequest.GetExecution().GetWorkflowID(),
queryRequest.GetExecution().GetRunID(),
scope,
wh.GetThrottledLogger(),
tag.BlobSizeViolationOperation("QueryWorkflow")); err != nil {
return nil, err
}
req := &types.HistoryQueryWorkflowRequest{
DomainUUID: domainID,
Request: queryRequest,
}
hResponse, err := wh.GetHistoryClient().QueryWorkflow(ctx, req)
if err != nil {
return nil, err
}
return hResponse.GetResponse(), nil
}
// DescribeWorkflowExecution returns information about the specified workflow execution.
func (wh *WorkflowHandler) DescribeWorkflowExecution(
ctx context.Context,
request *types.DescribeWorkflowExecutionRequest,
) (resp *types.DescribeWorkflowExecutionResponse, retError error) {
if wh.isShuttingDown() {
return nil, validate.ErrShuttingDown
}
if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
return nil, err
}
if request == nil {
return nil, validate.ErrRequestNotSet
}
domainName := request.GetDomain()
wfExecution := request.GetExecution()
if domainName == "" {
return nil, validate.ErrDomainNotSet
}
if err := validate.CheckExecution(wfExecution); err != nil {
return nil, err
}
domainID, err := wh.GetDomainCache().GetDomainID(request.GetDomain())
if err != nil {
return nil, err
}
response, err := wh.GetHistoryClient().DescribeWorkflowExecution(ctx, &types.HistoryDescribeWorkflowExecutionRequest{
DomainUUID: domainID,
Request: request,
})
if err != nil {
return nil, err
}
return response, nil
}
// DescribeTaskList returns information about the target tasklist, right now this API returns the
// pollers which polled this tasklist in last few minutes. If includeTaskListStatus field is true,
// it will also return status of tasklist's ackManager (readLevel, ackLevel, backlogCountHint and taskIDBlock).
func (wh *WorkflowHandler) DescribeTaskList(
ctx context.Context,
request *types.DescribeTaskListRequest,
) (resp *types.DescribeTaskListResponse, retError error) {
if wh.isShuttingDown() {
return nil, validate.ErrShuttingDown
}
if err := wh.versionChecker.ClientSupported(ctx, wh.config.EnableClientVersionCheck()); err != nil {
return nil, err
}
if request == nil {
return nil, validate.ErrRequestNotSet
}
if request.GetDomain() == "" {
return nil, validate.ErrDomainNotSet
}
domainID, err := wh.GetDomainCache().GetDomainID(request.GetDomain())
if err != nil {
return nil, err
}
scope := getMetricsScopeWithDomain(metrics.FrontendDescribeTaskListScope, request, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
if err := wh.validateTaskList(request.TaskList, scope, request.GetDomain()); err != nil {
return nil, err
}
if request.TaskListType == nil {
return nil, validate.ErrTaskListTypeNotSet
}
response, err := wh.GetMatchingClient().DescribeTaskList(ctx, &types.MatchingDescribeTaskListRequest{
DomainUUID: domainID,
DescRequest: request,
})
if err != nil {
return nil, err
}
return response, nil
}
// ListTaskListPartitions returns all the partition and host for a taskList
func (wh *WorkflowHandler) ListTaskListPartitions(
ctx context.Context,
request *types.ListTaskListPartitionsRequest,
) (resp *types.ListTaskListPartitionsResponse, retError error) {
if wh.isShuttingDown() {
return nil, validate.ErrShuttingDown
}
if request == nil {
return nil, validate.ErrRequestNotSet
}
if request.GetDomain() == "" {
return nil, validate.ErrDomainNotSet
}
scope := getMetricsScopeWithDomain(metrics.FrontendListTaskListPartitionsScope, request, wh.GetMetricsClient()).Tagged(metrics.GetContextTags(ctx)...)
if err := wh.validateTaskList(request.TaskList, scope, request.GetDomain()); err != nil {
return nil, err
}
resp, err := wh.GetMatchingClient().ListTaskListPartitions(ctx, &types.MatchingListTaskListPartitionsRequest{
Domain: request.Domain,
TaskList: request.TaskList,
})
return resp, err
}
// GetTaskListsByDomain returns all the partition and host for a taskList
func (wh *WorkflowHandler) GetTaskListsByDomain(
ctx context.Context,
request *types.GetTaskListsByDomainRequest,
) (resp *types.GetTaskListsByDomainResponse, retError error) {
if wh.isShuttingDown() {
return nil, validate.ErrShuttingDown
}
if request == nil {
return nil, validate.ErrRequestNotSet
}
if request.GetDomain() == "" {
return nil, validate.ErrDomainNotSet
}
resp, err := wh.GetMatchingClient().GetTaskListsByDomain(ctx, &types.GetTaskListsByDomainRequest{
Domain: request.Domain,
})
return resp, err
}
// RefreshWorkflowTasks re-generates the workflow tasks
func (wh *WorkflowHandler) RefreshWorkflowTasks(
ctx context.Context,
request *types.RefreshWorkflowTasksRequest,
) (err error) {
if request == nil {
return validate.ErrRequestNotSet
}
if err := validate.CheckExecution(request.Execution); err != nil {
return err
}
domainEntry, err := wh.GetDomainCache().GetDomain(request.GetDomain())
if err != nil {
return err
}
err = wh.GetHistoryClient().RefreshWorkflowTasks(ctx, &types.HistoryRefreshWorkflowTasksRequest{
DomainUIID: domainEntry.GetInfo().ID,
Request: request,
})
if err != nil {
return err
}
return nil
}
func (wh *WorkflowHandler) getRawHistory(
ctx context.Context,
scope metrics.Scope,
domainID string,
domainName string,
execution types.WorkflowExecution,
firstEventID int64,
nextEventID int64,
pageSize int32,
nextPageToken []byte,
transientDecision *types.TransientDecisionInfo,
branchToken []byte,
) ([]*types.DataBlob, []byte, error) {
rawHistory := []*types.DataBlob{}
shardID := common.WorkflowIDToHistoryShard(execution.WorkflowID, wh.config.NumHistoryShards)
resp, err := wh.GetHistoryManager().ReadRawHistoryBranch(ctx, &persistence.ReadHistoryBranchRequest{
BranchToken: branchToken,
MinEventID: firstEventID,
MaxEventID: nextEventID,
PageSize: int(pageSize),
NextPageToken: nextPageToken,
ShardID: common.IntPtr(shardID),
DomainName: domainName,
})
if err != nil {
return nil, nil, err
}
var encoding *types.EncodingType
for _, data := range resp.HistoryEventBlobs {
switch data.Encoding {
case common.EncodingTypeJSON:
encoding = types.EncodingTypeJSON.Ptr()
case common.EncodingTypeThriftRW:
encoding = types.EncodingTypeThriftRW.Ptr()
default:
panic(fmt.Sprintf("Invalid encoding type for raw history, encoding type: %s", data.Encoding))
}
rawHistory = append(rawHistory, &types.DataBlob{
EncodingType: encoding,
Data: data.Data,
})
}
if len(resp.NextPageToken) == 0 && transientDecision != nil {
if err := wh.validateTransientDecisionEvents(nextEventID, transientDecision); err != nil {
scope.IncCounter(metrics.CadenceErrIncompleteHistoryCounter)
wh.GetLogger().Error("getHistory error",
tag.WorkflowDomainID(domainID),
tag.WorkflowID(execution.GetWorkflowID()),
tag.WorkflowRunID(execution.GetRunID()),
tag.Error(err))
}
blob, err := wh.GetPayloadSerializer().SerializeBatchEvents(
[]*types.HistoryEvent{transientDecision.ScheduledEvent, transientDecision.StartedEvent}, common.EncodingTypeThriftRW)
if err != nil {
return nil, nil, err
}
rawHistory = append(rawHistory, &types.DataBlob{
EncodingType: types.EncodingTypeThriftRW.Ptr(),
Data: blob.Data,
})
}
return rawHistory, resp.NextPageToken, nil
}
func (wh *WorkflowHandler) getHistory(
ctx context.Context,
scope metrics.Scope,
domainID string,
domainName string,
execution types.WorkflowExecution,
firstEventID, nextEventID int64,
pageSize int32,
nextPageToken []byte,
transientDecision *types.TransientDecisionInfo,
branchToken []byte,
) (*types.History, []byte, error) {
var size int
isFirstPage := len(nextPageToken) == 0
shardID := common.WorkflowIDToHistoryShard(execution.WorkflowID, wh.config.NumHistoryShards)
var err error
historyEvents, size, nextPageToken, err := persistenceutils.ReadFullPageV2Events(ctx, wh.GetHistoryManager(), &persistence.ReadHistoryBranchRequest{
BranchToken: branchToken,
MinEventID: firstEventID,
MaxEventID: nextEventID,
PageSize: int(pageSize),
NextPageToken: nextPageToken,
ShardID: common.IntPtr(shardID),
DomainName: domainName,
})
if err != nil {
return nil, nil, err
}
scope.RecordTimer(metrics.HistorySize, time.Duration(size))
isLastPage := len(nextPageToken) == 0
if err := verifyHistoryIsComplete(
historyEvents,
firstEventID,
nextEventID-1,
isFirstPage,
isLastPage,
int(pageSize)); err != nil {
scope.IncCounter(metrics.CadenceErrIncompleteHistoryCounter)
wh.GetLogger().Error("getHistory: incomplete history",
tag.WorkflowDomainID(domainID),
tag.WorkflowID(execution.GetWorkflowID()),
tag.WorkflowRunID(execution.GetRunID()),
tag.Error(err))
return nil, nil, err
}
if len(nextPageToken) == 0 && transientDecision != nil {
if err := wh.validateTransientDecisionEvents(nextEventID, transientDecision); err != nil {
scope.IncCounter(metrics.CadenceErrIncompleteHistoryCounter)
wh.GetLogger().Error("getHistory error",
tag.WorkflowDomainID(domainID),
tag.WorkflowID(execution.GetWorkflowID()),
tag.WorkflowRunID(execution.GetRunID()),
tag.Error(err))
}
// Append the transient decision events once we are done enumerating everything from the events table
historyEvents = append(historyEvents, transientDecision.ScheduledEvent, transientDecision.StartedEvent)
}
executionHistory := &types.History{}
executionHistory.Events = historyEvents
return executionHistory, nextPageToken, nil
}
func (wh *WorkflowHandler) validateTransientDecisionEvents(
expectedNextEventID int64,
decision *types.TransientDecisionInfo,
) error {
if decision.ScheduledEvent.ID == expectedNextEventID &&
decision.StartedEvent.ID == expectedNextEventID+1 {
return nil
}
return fmt.Errorf(
"invalid transient decision: "+
"expectedScheduledEventID=%v expectedStartedEventID=%v but have scheduledEventID=%v startedEventID=%v",
expectedNextEventID,
expectedNextEventID+1,
decision.ScheduledEvent.ID,
decision.StartedEvent.ID)
}
func (wh *WorkflowHandler) validateTaskList(t *types.TaskList, scope metrics.Scope, domain string) error {
if t == nil || t.GetName() == "" {
return validate.ErrTaskListNotSet
}
if !common.IsValidIDLength(
t.GetName(),
scope,
wh.config.MaxIDLengthWarnLimit(),
wh.config.TaskListNameMaxLength(domain),
metrics.CadenceErrTaskListNameExceededWarnLimit,
domain,
wh.GetLogger(),
tag.IDTypeTaskListName) {
return validate.ErrTaskListTooLong
}
return nil
}
func (wh *WorkflowHandler) createPollForDecisionTaskResponse(
ctx context.Context,
scope metrics.Scope,
domainID string,
matchingResp *types.MatchingPollForDecisionTaskResponse,
branchToken []byte,
) (*types.PollForDecisionTaskResponse, error) {
if matchingResp.WorkflowExecution == nil {
// this will happen if there is no decision task to be send to worker / caller
return &types.PollForDecisionTaskResponse{}, nil
}
var history *types.History
var continuation []byte
var err error
if matchingResp.GetStickyExecutionEnabled() && matchingResp.Query != nil {
// meaning sticky query, we should not return any events to worker
// since query task only check the current status
history = &types.History{
Events: []*types.HistoryEvent{},
}
} else {
// here we have 3 cases:
// 1. sticky && non query task
// 2. non sticky && non query task
// 3. non sticky && query task
// for 1, partial history have to be send back
// for 2 and 3, full history have to be send back
var persistenceToken []byte
firstEventID := common.FirstEventID
nextEventID := matchingResp.GetNextEventID()
if matchingResp.GetStickyExecutionEnabled() {
firstEventID = matchingResp.GetPreviousStartedEventID() + 1
}
domainName, dErr := wh.GetDomainCache().GetDomainName(domainID)
if dErr != nil {
return nil, dErr
}
scope = scope.Tagged(metrics.DomainTag(domainName))
history, persistenceToken, err = wh.getHistory(
ctx,
scope,
domainID,
domainName,
*matchingResp.WorkflowExecution,
firstEventID,
nextEventID,
int32(wh.config.HistoryMaxPageSize(domainName)),
nil,
matchingResp.DecisionInfo,
branchToken,
)
if err != nil {
return nil, err
}
if len(persistenceToken) != 0 {
continuation, err = serializeHistoryToken(&getHistoryContinuationToken{
RunID: matchingResp.WorkflowExecution.GetRunID(),
FirstEventID: firstEventID,
NextEventID: nextEventID,
PersistenceToken: persistenceToken,
TransientDecision: matchingResp.DecisionInfo,
BranchToken: branchToken,
})
if err != nil {
return nil, err
}
}
}
resp := &types.PollForDecisionTaskResponse{
TaskToken: matchingResp.TaskToken,
WorkflowExecution: matchingResp.WorkflowExecution,
WorkflowType: matchingResp.WorkflowType,
PreviousStartedEventID: matchingResp.PreviousStartedEventID,
StartedEventID: matchingResp.StartedEventID, // this field is not set for query tasks as there's no decision task started event
Query: matchingResp.Query,
BacklogCountHint: matchingResp.BacklogCountHint,
Attempt: matchingResp.Attempt,
History: history,
NextPageToken: continuation,
WorkflowExecutionTaskList: matchingResp.WorkflowExecutionTaskList,
ScheduledTimestamp: matchingResp.ScheduledTimestamp,
StartedTimestamp: matchingResp.StartedTimestamp,
Queries: matchingResp.Queries,
NextEventID: matchingResp.NextEventID,
TotalHistoryBytes: matchingResp.TotalHistoryBytes,
}
return resp, nil
}
func verifyHistoryIsComplete(
events []*types.HistoryEvent,
expectedFirstEventID int64,
expectedLastEventID int64,
isFirstPage bool,
isLastPage bool,
pageSize int,
) error {
nEvents := len(events)
if nEvents == 0 {
if isLastPage {
// we seem to be returning a non-nil pageToken on the lastPage which
// in turn cases the client to call getHistory again - only to find
// there are no more events to consume - bail out if this is the case here
return nil
}
return fmt.Errorf("invalid history: contains zero events")
}
firstEventID := events[0].ID
lastEventID := events[nEvents-1].ID
if !isFirstPage { // atleast one page of history has been read previously
if firstEventID <= expectedFirstEventID {
// not first page and no events have been read in the previous pages - not possible
return &types.InternalServiceError{
Message: fmt.Sprintf(
"invalid history: expected first eventID to be > %v but got %v", expectedFirstEventID, firstEventID),
}
}
expectedFirstEventID = firstEventID
}
if !isLastPage {
// estimate lastEventID based on pageSize. This is a lower bound
// since the persistence layer counts "batch of events" as a single page
expectedLastEventID = expectedFirstEventID + int64(pageSize) - 1
}
nExpectedEvents := expectedLastEventID - expectedFirstEventID + 1
if firstEventID == expectedFirstEventID &&
((isLastPage && lastEventID == expectedLastEventID && int64(nEvents) == nExpectedEvents) ||
(!isLastPage && lastEventID >= expectedLastEventID && int64(nEvents) >= nExpectedEvents)) {
return nil
}
return &types.InternalServiceError{
Message: fmt.Sprintf(
"incomplete history: "+
"expected events [%v-%v] but got events [%v-%v] of length %v:"+
"isFirstPage=%v,isLastPage=%v,pageSize=%v",
expectedFirstEventID,
expectedLastEventID,
firstEventID,
lastEventID,
nEvents,
isFirstPage,
isLastPage,
pageSize),
}
}
func deserializeHistoryToken(bytes []byte) (*getHistoryContinuationToken, error) {
token := &getHistoryContinuationToken{}
err := json.Unmarshal(bytes, token)
return token, err
}
func serializeHistoryToken(token *getHistoryContinuationToken) ([]byte, error) {
if token == nil {
return nil, nil
}
bytes, err := json.Marshal(token)
return bytes, err
}
func isFailoverRequest(updateRequest *types.UpdateDomainRequest) bool {
return updateRequest.ActiveClusterName != nil
}
func isGraceFailoverRequest(updateRequest *types.UpdateDomainRequest) bool {
return updateRequest.FailoverTimeoutInSeconds != nil
}
func (wh *WorkflowHandler) checkOngoingFailover(
ctx context.Context,
domainName *string,
) error {
enabledClusters := wh.GetClusterMetadata().GetEnabledClusterInfo()
respChan := make(chan *types.DescribeDomainResponse, len(enabledClusters))
g := &errgroup.Group{}
for clusterName := range enabledClusters {
frontendClient := wh.GetRemoteFrontendClient(clusterName)
g.Go(func() (e error) {
defer func() { log.CapturePanic(recover(), wh.GetLogger(), &e) }()
resp, _ := frontendClient.DescribeDomain(ctx, &types.DescribeDomainRequest{Name: domainName})
respChan <- resp
return nil
})
}
g.Wait()
close(respChan)
var failoverVersion *int64
for resp := range respChan {
if resp == nil {
return &types.InternalServiceError{
Message: "Failed to verify failover version from all clusters",
}
}
if failoverVersion == nil {
failoverVersion = &resp.FailoverVersion
}
if *failoverVersion != resp.GetFailoverVersion() {
return &types.BadRequestError{
Message: "Concurrent failover is not allow.",
}
}
}
return nil
}
func (wh *WorkflowHandler) historyArchived(ctx context.Context, request *types.GetWorkflowExecutionHistoryRequest, domainID string) bool {
if request.GetExecution() == nil || request.GetExecution().GetRunID() == "" {
return false
}
getMutableStateRequest := &types.GetMutableStateRequest{
DomainUUID: domainID,
Execution: request.Execution,
}
_, err := wh.GetHistoryClient().GetMutableState(ctx, getMutableStateRequest)
if err == nil {
return false
}
switch err.(type) {
case *types.EntityNotExistsError:
// the only case in which history is assumed to be archived is if getting mutable state returns entity not found error
return true
}
return false
}
func (wh *WorkflowHandler) getArchivedHistory(
ctx context.Context,
request *types.GetWorkflowExecutionHistoryRequest,
domainID string,
) (*types.GetWorkflowExecutionHistoryResponse, error) {
entry, err := wh.GetDomainCache().GetDomainByID(domainID)
if err != nil {
return nil, err
}
URIString := entry.GetConfig().HistoryArchivalURI
if URIString == "" {
// if URI is empty, it means the domain has never enabled for archival.
// the error is not "workflow has passed retention period", because
// we have no way to tell if the requested workflow exists or not.
return nil, validate.ErrHistoryNotFound
}
URI, err := archiver.NewURI(URIString)
if err != nil {
return nil, err
}
historyArchiver, err := wh.GetArchiverProvider().GetHistoryArchiver(URI.Scheme(), service.Frontend)
if err != nil {
return nil, err
}
resp, err := historyArchiver.Get(ctx, URI, &archiver.GetHistoryRequest{
DomainID: domainID,
WorkflowID: request.GetExecution().GetWorkflowID(),
RunID: request.GetExecution().GetRunID(),
NextPageToken: request.GetNextPageToken(),
PageSize: int(request.GetMaximumPageSize()),
})
if err != nil {
return nil, err
}
history := &types.History{}
for _, batch := range resp.HistoryBatches {
history.Events = append(history.Events, batch.Events...)
}
return &types.GetWorkflowExecutionHistoryResponse{
History: history,
NextPageToken: resp.NextPageToken,
Archived: true,
}, nil
}
func (wh *WorkflowHandler) convertIndexedKeyToThrift(keys map[string]interface{}) map[string]types.IndexedValueType {
converted := make(map[string]types.IndexedValueType)
for k, v := range keys {
converted[k] = common.ConvertIndexedValueTypeToInternalType(v, wh.GetLogger())
}
return converted
}
func (wh *WorkflowHandler) isListRequestPageSizeTooLarge(pageSize int32, domain string) bool {
return common.IsAdvancedVisibilityReadingEnabled(wh.config.EnableReadVisibilityFromES(domain), wh.config.IsAdvancedVisConfigExist) &&
pageSize > int32(wh.config.ESIndexMaxResultWindow())
}
// GetClusterInfo return information about cadence deployment
func (wh *WorkflowHandler) GetClusterInfo(
ctx context.Context,
) (resp *types.ClusterInfo, err error) {
return &types.ClusterInfo{
SupportedClientVersions: &types.SupportedClientVersions{
GoSdk: client.SupportedGoSDKVersion,
JavaSdk: client.SupportedJavaSDKVersion,
},
}, nil
}
func checkFailOverPermission(config *config.Config, domainName string) error {
if config.Lockdown(domainName) {
return validate.ErrDomainInLockdown
}
return nil
}
type domainWrapper struct {
domain string
}
func (d domainWrapper) GetDomain() string {
return d.domain
}
func (hs HealthStatus) String() string {
switch hs {
case HealthStatusOK:
return "OK"
case HealthStatusWarmingUp:
return "WarmingUp"
case HealthStatusShuttingDown:
return "ShuttingDown"
default:
return "unknown"
}
}
func getDomainWfIDRunIDTags(
domainName string,
wf *types.WorkflowExecution,
) []tag.Tag {
tags := []tag.Tag{tag.WorkflowDomainName(domainName)}
if wf == nil {
return tags
}
return append(
tags,
tag.WorkflowID(wf.GetWorkflowID()),
tag.WorkflowRunID(wf.GetRunID()),
)
}
func checkRequiredDomainDataKVs(requiredDomainDataKeys map[string]interface{}, domainData map[string]string) error {
// check requiredDomainDataKeys
for k := range requiredDomainDataKeys {
_, ok := domainData[k]
if !ok {
return fmt.Errorf("domain data error, missing required key %v . All required keys: %v", k, requiredDomainDataKeys)
}
}
return nil
}
// Some error types are introduced later that some clients might not support
// To make them backward compatible, we continue returning the legacy error types
// for older clients
func (wh *WorkflowHandler) normalizeVersionedErrors(ctx context.Context, err error) error {
switch err.(type) {
case *types.WorkflowExecutionAlreadyCompletedError:
call := yarpc.CallFromContext(ctx)
clientFeatureVersion := call.Header(common.FeatureVersionHeaderName)
clientImpl := call.Header(common.ClientImplHeaderName)
featureFlags := client.GetFeatureFlagsFromHeader(call)
vErr := wh.versionChecker.SupportsWorkflowAlreadyCompletedError(clientImpl, clientFeatureVersion, featureFlags)
if vErr == nil {
return err
}
return &types.EntityNotExistsError{Message: "Workflow execution already completed."}
default:
return err
}
}
func constructRestartWorkflowRequest(w *types.WorkflowExecutionStartedEventAttributes, domain string, identity string, workflowID string) *types.StartWorkflowExecutionRequest {
startRequest := &types.StartWorkflowExecutionRequest{
RequestID: uuid.New().String(),
Domain: domain,
WorkflowID: workflowID,
WorkflowType: &types.WorkflowType{
Name: w.WorkflowType.Name,
},
TaskList: &types.TaskList{
Name: w.TaskList.Name,
},
Input: w.Input,
ExecutionStartToCloseTimeoutSeconds: w.ExecutionStartToCloseTimeoutSeconds,
TaskStartToCloseTimeoutSeconds: w.TaskStartToCloseTimeoutSeconds,
Identity: identity,
WorkflowIDReusePolicy: types.WorkflowIDReusePolicyTerminateIfRunning.Ptr(),
}
startRequest.CronSchedule = w.CronSchedule
startRequest.RetryPolicy = w.RetryPolicy
startRequest.DelayStartSeconds = w.FirstDecisionTaskBackoffSeconds
startRequest.Header = w.Header
startRequest.Memo = w.Memo
startRequest.SearchAttributes = w.SearchAttributes
return startRequest
}
func getMetricsScopeWithDomain(
scope int,
d domainGetter,
metricsClient metrics.Client,
) metrics.Scope {
var metricsScope metrics.Scope
if d != nil {
metricsScope = metricsClient.Scope(scope).Tagged(metrics.DomainTag(d.GetDomain()))
} else {
metricsScope = metricsClient.Scope(scope).Tagged(metrics.DomainUnknownTag())
}
return metricsScope
}