common/util.go (794 lines of code) (raw):

// Copyright (c) 2017 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package common import ( "context" "encoding/json" "errors" "fmt" "math" "math/rand" "sort" "strconv" "strings" "sync" "time" "github.com/dgryski/go-farm" "github.com/pborman/uuid" "go.uber.org/yarpc/yarpcerrors" "github.com/uber/cadence/common/backoff" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/types" ) const ( golandMapReserverNumberOfBytes = 48 retryPersistenceOperationInitialInterval = 50 * time.Millisecond retryPersistenceOperationMaxInterval = 10 * time.Second retryPersistenceOperationExpirationInterval = 30 * time.Second historyServiceOperationInitialInterval = 50 * time.Millisecond historyServiceOperationMaxInterval = 10 * time.Second historyServiceOperationExpirationInterval = 30 * time.Second matchingServiceOperationInitialInterval = 1000 * time.Millisecond matchingServiceOperationMaxInterval = 10 * time.Second matchingServiceOperationExpirationInterval = 30 * time.Second frontendServiceOperationInitialInterval = 200 * time.Millisecond frontendServiceOperationMaxInterval = 5 * time.Second frontendServiceOperationExpirationInterval = 15 * time.Second adminServiceOperationInitialInterval = 200 * time.Millisecond adminServiceOperationMaxInterval = 5 * time.Second adminServiceOperationExpirationInterval = 15 * time.Second retryKafkaOperationInitialInterval = 50 * time.Millisecond retryKafkaOperationMaxInterval = 10 * time.Second retryKafkaOperationMaxAttempts = 10 retryTaskProcessingInitialInterval = 50 * time.Millisecond retryTaskProcessingMaxInterval = 100 * time.Millisecond retryTaskProcessingMaxAttempts = 3 replicationServiceBusyInitialInterval = 2 * time.Second replicationServiceBusyMaxInterval = 10 * time.Second replicationServiceBusyExpirationInterval = 5 * time.Minute contextExpireThreshold = 10 * time.Millisecond // FailureReasonCompleteResultExceedsLimit is failureReason for complete result exceeds limit FailureReasonCompleteResultExceedsLimit = "COMPLETE_RESULT_EXCEEDS_LIMIT" // FailureReasonFailureDetailsExceedsLimit is failureReason for failure details exceeds limit FailureReasonFailureDetailsExceedsLimit = "FAILURE_DETAILS_EXCEEDS_LIMIT" // FailureReasonCancelDetailsExceedsLimit is failureReason for cancel details exceeds limit FailureReasonCancelDetailsExceedsLimit = "CANCEL_DETAILS_EXCEEDS_LIMIT" // FailureReasonHeartbeatExceedsLimit is failureReason for heartbeat exceeds limit FailureReasonHeartbeatExceedsLimit = "HEARTBEAT_EXCEEDS_LIMIT" // FailureReasonDecisionBlobSizeExceedsLimit is the failureReason for decision blob exceeds size limit FailureReasonDecisionBlobSizeExceedsLimit = "DECISION_BLOB_SIZE_EXCEEDS_LIMIT" // FailureReasonSizeExceedsLimit is reason to fail workflow when history size or count exceed limit FailureReasonSizeExceedsLimit = "HISTORY_EXCEEDS_LIMIT" // FailureReasonTransactionSizeExceedsLimit is the failureReason for when transaction cannot be committed because it exceeds size limit FailureReasonTransactionSizeExceedsLimit = "TRANSACTION_SIZE_EXCEEDS_LIMIT" // FailureReasonDecisionAttemptsExceedsLimit is reason to fail workflow when decision attempts fail too many times FailureReasonDecisionAttemptsExceedsLimit = "DECISION_ATTEMPTS_EXCEEDS_LIMIT" ) var ( // ErrBlobSizeExceedsLimit is error for event blob size exceeds limit ErrBlobSizeExceedsLimit = &types.BadRequestError{Message: "Blob data size exceeds limit."} // ErrContextTimeoutTooShort is error for setting a very short context timeout when calling a long poll API ErrContextTimeoutTooShort = &types.BadRequestError{Message: "Context timeout is too short."} // ErrContextTimeoutNotSet is error for not setting a context timeout when calling a long poll API ErrContextTimeoutNotSet = &types.BadRequestError{Message: "Context timeout is not set."} // ErrDecisionResultCountTooLarge error for decision result count exceeds limit ErrDecisionResultCountTooLarge = &types.BadRequestError{Message: "Decision result count exceeds limit."} ) // AwaitWaitGroup calls Wait on the given wait // Returns true if the Wait() call succeeded before the timeout // Returns false if the Wait() did not return before the timeout func AwaitWaitGroup(wg *sync.WaitGroup, timeout time.Duration) bool { doneC := make(chan struct{}) go func() { wg.Wait() close(doneC) }() select { case <-doneC: return true case <-time.After(timeout): return false } } // CreatePersistenceRetryPolicy creates a retry policy for persistence layer operations func CreatePersistenceRetryPolicy() backoff.RetryPolicy { policy := backoff.NewExponentialRetryPolicy(retryPersistenceOperationInitialInterval) policy.SetMaximumInterval(retryPersistenceOperationMaxInterval) policy.SetExpirationInterval(retryPersistenceOperationExpirationInterval) return policy } // CreateHistoryServiceRetryPolicy creates a retry policy for calls to history service func CreateHistoryServiceRetryPolicy() backoff.RetryPolicy { policy := backoff.NewExponentialRetryPolicy(historyServiceOperationInitialInterval) policy.SetMaximumInterval(historyServiceOperationMaxInterval) policy.SetExpirationInterval(historyServiceOperationExpirationInterval) return policy } // CreateMatchingServiceRetryPolicy creates a retry policy for calls to matching service func CreateMatchingServiceRetryPolicy() backoff.RetryPolicy { policy := backoff.NewExponentialRetryPolicy(matchingServiceOperationInitialInterval) policy.SetMaximumInterval(matchingServiceOperationMaxInterval) policy.SetExpirationInterval(matchingServiceOperationExpirationInterval) return policy } // CreateFrontendServiceRetryPolicy creates a retry policy for calls to frontend service func CreateFrontendServiceRetryPolicy() backoff.RetryPolicy { policy := backoff.NewExponentialRetryPolicy(frontendServiceOperationInitialInterval) policy.SetMaximumInterval(frontendServiceOperationMaxInterval) policy.SetExpirationInterval(frontendServiceOperationExpirationInterval) return policy } // CreateAdminServiceRetryPolicy creates a retry policy for calls to matching service func CreateAdminServiceRetryPolicy() backoff.RetryPolicy { policy := backoff.NewExponentialRetryPolicy(adminServiceOperationInitialInterval) policy.SetMaximumInterval(adminServiceOperationMaxInterval) policy.SetExpirationInterval(adminServiceOperationExpirationInterval) return policy } // CreateDlqPublishRetryPolicy creates a retry policy for kafka operation func CreateDlqPublishRetryPolicy() backoff.RetryPolicy { policy := backoff.NewExponentialRetryPolicy(retryKafkaOperationInitialInterval) policy.SetMaximumInterval(retryKafkaOperationMaxInterval) policy.SetMaximumAttempts(retryKafkaOperationMaxAttempts) return policy } // CreateTaskProcessingRetryPolicy creates a retry policy for task processing func CreateTaskProcessingRetryPolicy() backoff.RetryPolicy { policy := backoff.NewExponentialRetryPolicy(retryTaskProcessingInitialInterval) policy.SetMaximumInterval(retryTaskProcessingMaxInterval) policy.SetMaximumAttempts(retryTaskProcessingMaxAttempts) return policy } // CreateReplicationServiceBusyRetryPolicy creates a retry policy to handle replication service busy func CreateReplicationServiceBusyRetryPolicy() backoff.RetryPolicy { policy := backoff.NewExponentialRetryPolicy(replicationServiceBusyInitialInterval) policy.SetMaximumInterval(replicationServiceBusyMaxInterval) policy.SetExpirationInterval(replicationServiceBusyExpirationInterval) return policy } // IsValidIDLength checks if id is valid according to its length func IsValidIDLength( id string, scope metrics.Scope, warnLimit int, errorLimit int, metricsCounter int, domainName string, logger log.Logger, idTypeViolationTag tag.Tag, ) bool { if len(id) > warnLimit { scope.IncCounter(metricsCounter) logger.Warn("ID length exceeds limit.", tag.WorkflowDomainName(domainName), tag.Name(id), idTypeViolationTag) } return len(id) <= errorLimit } // CheckDecisionResultLimit checks if decision result count exceeds limits. func CheckDecisionResultLimit( actualSize int, limit int, scope metrics.Scope, ) error { scope.RecordTimer(metrics.DecisionResultCount, time.Duration(actualSize)) if limit > 0 && actualSize > limit { return ErrDecisionResultCountTooLarge } return nil } // ToServiceTransientError converts an error to ServiceTransientError func ToServiceTransientError(err error) error { if err == nil || IsServiceTransientError(err) { return err } return yarpcerrors.Newf(yarpcerrors.CodeUnavailable, err.Error()) } // HistoryRetryFuncFrontendExceptions checks if an error should be retried // in a call from frontend func FrontendRetry(err error) bool { var sbErr *types.ServiceBusyError if errors.As(err, &sbErr) { // If the service busy error is due to workflow id rate limiting, proxy it to the caller return sbErr.Reason != WorkflowIDRateLimitReason } return IsServiceTransientError(err) } // IsServiceTransientError checks if the error is a transient error. func IsServiceTransientError(err error) bool { var ( typesInternalServiceError *types.InternalServiceError typesServiceBusyError *types.ServiceBusyError typesShardOwnershipLostError *types.ShardOwnershipLostError yarpcErrorsStatus *yarpcerrors.Status ) switch { case errors.As(err, &typesInternalServiceError): return true case errors.As(err, &typesServiceBusyError): return true case errors.As(err, &typesShardOwnershipLostError): return true case errors.As(err, &yarpcErrorsStatus): // We only selectively retry the following yarpc errors client can safe retry with a backoff if yarpcerrors.IsUnavailable(err) || yarpcerrors.IsUnknown(err) || yarpcerrors.IsInternal(err) { return true } return false } return false } // IsEntityNotExistsError checks if the error is an entity not exists error. func IsEntityNotExistsError(err error) bool { _, ok := err.(*types.EntityNotExistsError) return ok } // IsServiceBusyError checks if the error is a service busy error. func IsServiceBusyError(err error) bool { switch err.(type) { case *types.ServiceBusyError: return true } return false } // IsContextTimeoutError checks if the error is context timeout error func IsContextTimeoutError(err error) bool { switch err := err.(type) { case *types.InternalServiceError: return err.Message == context.DeadlineExceeded.Error() } return err == context.DeadlineExceeded || yarpcerrors.IsDeadlineExceeded(err) } // WorkflowIDToHistoryShard is used to map a workflowID to a shardID func WorkflowIDToHistoryShard(workflowID string, numberOfShards int) int { hash := farm.Fingerprint32([]byte(workflowID)) return int(hash % uint32(numberOfShards)) } // DomainIDToHistoryShard is used to map a domainID to a shardID func DomainIDToHistoryShard(domainID string, numberOfShards int) int { hash := farm.Fingerprint32([]byte(domainID)) return int(hash % uint32(numberOfShards)) } // PrettyPrintHistory prints history in human readable format func PrettyPrintHistory(history *types.History, logger log.Logger) { data, err := json.MarshalIndent(history, "", " ") if err != nil { logger.Error("Error serializing history: %v\n", tag.Error(err)) } fmt.Println("******************************************") fmt.Println("History", tag.DetailInfo(string(data))) fmt.Println("******************************************") } // IsValidContext checks that the thrift context is not expired on cancelled. // Returns nil if the context is still valid. Otherwise, returns the result of // ctx.Err() func IsValidContext(ctx context.Context) error { ch := ctx.Done() if ch != nil { select { case <-ch: return ctx.Err() default: // go to the next line } } deadline, ok := ctx.Deadline() if ok && time.Until(deadline) < contextExpireThreshold { return context.DeadlineExceeded } return nil } // emptyCancelFunc wraps an empty func by context.CancelFunc interface var emptyCancelFunc = context.CancelFunc(func() {}) // CreateChildContext creates a child context which shorted context timeout // from the given parent context // tailroom must be in range [0, 1] and // (1-tailroom) * parent timeout will be the new child context timeout // if tailroom is less 0, tailroom will be considered as 0 // if tailroom is greater than 1, tailroom wil be considered as 1 func CreateChildContext( parent context.Context, tailroom float64, ) (context.Context, context.CancelFunc) { if parent == nil { return nil, emptyCancelFunc } if parent.Err() != nil { return parent, emptyCancelFunc } now := time.Now() deadline, ok := parent.Deadline() if !ok || deadline.Before(now) { return parent, emptyCancelFunc } // if tailroom is about or less 0, then return a context with the same deadline as parent if tailroom <= 0 { return context.WithDeadline(parent, deadline) } // if tailroom is about or greater 1, then return a context with deadline of now if tailroom >= 1 { return context.WithDeadline(parent, now) } newDeadline := now.Add(time.Duration(math.Ceil(float64(deadline.Sub(now)) * (1.0 - tailroom)))) return context.WithDeadline(parent, newDeadline) } // GenerateRandomString is used for generate test string func GenerateRandomString(n int) string { if n <= 0 { return "" } letterRunes := []rune("random") b := make([]rune, n) for i := range b { b[i] = letterRunes[rand.Intn(len(letterRunes))] } return string(b) } // CreateMatchingPollForDecisionTaskResponse create response for matching's PollForDecisionTask func CreateMatchingPollForDecisionTaskResponse(historyResponse *types.RecordDecisionTaskStartedResponse, workflowExecution *types.WorkflowExecution, token []byte) *types.MatchingPollForDecisionTaskResponse { matchingResp := &types.MatchingPollForDecisionTaskResponse{ WorkflowExecution: workflowExecution, TaskToken: token, Attempt: historyResponse.GetAttempt(), WorkflowType: historyResponse.WorkflowType, StartedEventID: historyResponse.StartedEventID, StickyExecutionEnabled: historyResponse.StickyExecutionEnabled, NextEventID: historyResponse.NextEventID, DecisionInfo: historyResponse.DecisionInfo, WorkflowExecutionTaskList: historyResponse.WorkflowExecutionTaskList, BranchToken: historyResponse.BranchToken, ScheduledTimestamp: historyResponse.ScheduledTimestamp, StartedTimestamp: historyResponse.StartedTimestamp, Queries: historyResponse.Queries, TotalHistoryBytes: historyResponse.HistorySize, } if historyResponse.GetPreviousStartedEventID() != EmptyEventID { matchingResp.PreviousStartedEventID = historyResponse.PreviousStartedEventID } return matchingResp } // MinInt64 returns the smaller of two given int64 func MinInt64(a, b int64) int64 { if a < b { return a } return b } // MaxInt64 returns the greater of two given int64 func MaxInt64(a, b int64) int64 { if a > b { return a } return b } // MinInt32 return smaller one of two inputs int32 func MinInt32(a, b int32) int32 { if a < b { return a } return b } // MinInt returns the smaller of two given integers func MinInt(a, b int) int { if a < b { return a } return b } // MaxInt returns the greater one of two given integers func MaxInt(a, b int) int { if a > b { return a } return b } // MinDuration returns the smaller of two given time duration func MinDuration(a, b time.Duration) time.Duration { if a < b { return a } return b } // MaxDuration returns the greater of two given time durations func MaxDuration(a, b time.Duration) time.Duration { if a > b { return a } return b } // SortInt64Slice sorts the given int64 slice. // Sort is not guaranteed to be stable. func SortInt64Slice(slice []int64) { sort.Slice(slice, func(i int, j int) bool { return slice[i] < slice[j] }) } // ValidateRetryPolicy validates a retry policy func ValidateRetryPolicy(policy *types.RetryPolicy) error { if policy == nil { // nil policy is valid which means no retry return nil } if policy.GetInitialIntervalInSeconds() <= 0 { return &types.BadRequestError{Message: "InitialIntervalInSeconds must be greater than 0 on retry policy."} } if policy.GetBackoffCoefficient() < 1 { return &types.BadRequestError{Message: "BackoffCoefficient cannot be less than 1 on retry policy."} } if policy.GetMaximumIntervalInSeconds() < 0 { return &types.BadRequestError{Message: "MaximumIntervalInSeconds cannot be less than 0 on retry policy."} } if policy.GetMaximumIntervalInSeconds() > 0 && policy.GetMaximumIntervalInSeconds() < policy.GetInitialIntervalInSeconds() { return &types.BadRequestError{Message: "MaximumIntervalInSeconds cannot be less than InitialIntervalInSeconds on retry policy."} } if policy.GetMaximumAttempts() < 0 { return &types.BadRequestError{Message: "MaximumAttempts cannot be less than 0 on retry policy."} } if policy.GetExpirationIntervalInSeconds() < 0 { return &types.BadRequestError{Message: "ExpirationIntervalInSeconds cannot be less than 0 on retry policy."} } if policy.GetMaximumAttempts() == 0 && policy.GetExpirationIntervalInSeconds() == 0 { return &types.BadRequestError{Message: "MaximumAttempts and ExpirationIntervalInSeconds are both 0. At least one of them must be specified."} } return nil } // CreateHistoryStartWorkflowRequest create a start workflow request for history func CreateHistoryStartWorkflowRequest( domainID string, startRequest *types.StartWorkflowExecutionRequest, now time.Time, partitionConfig map[string]string, ) (*types.HistoryStartWorkflowExecutionRequest, error) { histRequest := &types.HistoryStartWorkflowExecutionRequest{ DomainUUID: domainID, StartRequest: startRequest, PartitionConfig: partitionConfig, } delayStartSeconds := startRequest.GetDelayStartSeconds() jitterStartSeconds := startRequest.GetJitterStartSeconds() firstDecisionTaskBackoffSeconds := delayStartSeconds if len(startRequest.GetCronSchedule()) > 0 { delayedStartTime := now.Add(time.Second * time.Duration(delayStartSeconds)) var err error firstDecisionTaskBackoffSeconds, err = backoff.GetBackoffForNextScheduleInSeconds( startRequest.GetCronSchedule(), delayedStartTime, delayedStartTime, jitterStartSeconds) if err != nil { return nil, err } // backoff seconds was calculated based on delayed start time, so we need to // add the delayStartSeconds to that backoff. firstDecisionTaskBackoffSeconds += delayStartSeconds } else if jitterStartSeconds > 0 { // Add a random jitter to start time, if requested. firstDecisionTaskBackoffSeconds += rand.Int31n(jitterStartSeconds + 1) } histRequest.FirstDecisionTaskBackoffSeconds = Int32Ptr(firstDecisionTaskBackoffSeconds) if startRequest.RetryPolicy != nil && startRequest.RetryPolicy.GetExpirationIntervalInSeconds() > 0 { expirationInSeconds := startRequest.RetryPolicy.GetExpirationIntervalInSeconds() + firstDecisionTaskBackoffSeconds // expirationTime calculates from first decision task schedule to the end of the workflow deadline := now.Add(time.Duration(expirationInSeconds) * time.Second) histRequest.ExpirationTimestamp = Int64Ptr(deadline.Round(time.Millisecond).UnixNano()) } return histRequest, nil } // CheckEventBlobSizeLimit checks if a blob data exceeds limits. It logs a warning if it exceeds warnLimit, // and return ErrBlobSizeExceedsLimit if it exceeds errorLimit. func CheckEventBlobSizeLimit( actualSize int, warnLimit int, errorLimit int, domainID string, workflowID string, runID string, scope metrics.Scope, logger log.Logger, blobSizeViolationOperationTag tag.Tag, ) error { scope.RecordTimer(metrics.EventBlobSize, time.Duration(actualSize)) if actualSize > warnLimit { if logger != nil { logger.Warn("Blob size exceeds limit.", tag.WorkflowDomainID(domainID), tag.WorkflowID(workflowID), tag.WorkflowRunID(runID), tag.WorkflowSize(int64(actualSize)), blobSizeViolationOperationTag) } if actualSize > errorLimit { return ErrBlobSizeExceedsLimit } } return nil } // ValidateLongPollContextTimeout check if the context timeout for a long poll handler is too short or below a normal value. // If the timeout is not set or too short, it logs an error, and return ErrContextTimeoutNotSet or ErrContextTimeoutTooShort // accordingly. If the timeout is only below a normal value, it just logs an info and return nil. func ValidateLongPollContextTimeout( ctx context.Context, handlerName string, logger log.Logger, ) error { deadline, err := ValidateLongPollContextTimeoutIsSet(ctx, handlerName, logger) if err != nil { return err } timeout := time.Until(deadline) if timeout < MinLongPollTimeout { err := ErrContextTimeoutTooShort logger.Error("Context timeout is too short for long poll API.", tag.WorkflowHandlerName(handlerName), tag.Error(err), tag.WorkflowPollContextTimeout(timeout)) return err } if timeout < CriticalLongPollTimeout { logger.Debug("Context timeout is lower than critical value for long poll API.", tag.WorkflowHandlerName(handlerName), tag.WorkflowPollContextTimeout(timeout)) } return nil } // ValidateLongPollContextTimeoutIsSet checks if the context timeout is set for long poll requests. func ValidateLongPollContextTimeoutIsSet( ctx context.Context, handlerName string, logger log.Logger, ) (time.Time, error) { deadline, ok := ctx.Deadline() if !ok { err := ErrContextTimeoutNotSet logger.Error("Context timeout not set for long poll API.", tag.WorkflowHandlerName(handlerName), tag.Error(err)) return deadline, err } return deadline, nil } // ValidateDomainUUID checks if the given domainID string is a valid UUID func ValidateDomainUUID( domainUUID string, ) error { if domainUUID == "" { return &types.BadRequestError{Message: "Missing domain UUID."} } else if uuid.Parse(domainUUID) == nil { return &types.BadRequestError{Message: "Invalid domain UUID."} } return nil } // GetSizeOfMapStringToByteArray get size of map[string][]byte func GetSizeOfMapStringToByteArray(input map[string][]byte) int { if input == nil { return 0 } res := 0 for k, v := range input { res += len(k) + len(v) } return res + golandMapReserverNumberOfBytes } // GetSizeOfHistoryEvent returns approximate size in bytes of the history event taking into account byte arrays only now func GetSizeOfHistoryEvent(event *types.HistoryEvent) uint64 { if event == nil || event.EventType == nil { return 0 } res := 0 switch *event.EventType { case types.EventTypeWorkflowExecutionStarted: res += len(event.WorkflowExecutionStartedEventAttributes.Input) res += len(event.WorkflowExecutionStartedEventAttributes.ContinuedFailureDetails) res += len(event.WorkflowExecutionStartedEventAttributes.LastCompletionResult) if event.WorkflowExecutionStartedEventAttributes.Memo != nil { res += GetSizeOfMapStringToByteArray(event.WorkflowExecutionStartedEventAttributes.Memo.Fields) } if event.WorkflowExecutionStartedEventAttributes.Header != nil { res += GetSizeOfMapStringToByteArray(event.WorkflowExecutionStartedEventAttributes.Header.Fields) } if event.WorkflowExecutionStartedEventAttributes.SearchAttributes != nil { res += GetSizeOfMapStringToByteArray(event.WorkflowExecutionStartedEventAttributes.SearchAttributes.IndexedFields) } case types.EventTypeWorkflowExecutionCompleted: res += len(event.WorkflowExecutionCompletedEventAttributes.Result) case types.EventTypeWorkflowExecutionFailed: res += len(event.WorkflowExecutionFailedEventAttributes.Details) case types.EventTypeWorkflowExecutionTimedOut: case types.EventTypeDecisionTaskScheduled: case types.EventTypeDecisionTaskStarted: case types.EventTypeDecisionTaskCompleted: res += len(event.DecisionTaskCompletedEventAttributes.ExecutionContext) case types.EventTypeDecisionTaskTimedOut: case types.EventTypeDecisionTaskFailed: res += len(event.DecisionTaskFailedEventAttributes.Details) case types.EventTypeActivityTaskScheduled: res += len(event.ActivityTaskScheduledEventAttributes.Input) if event.ActivityTaskScheduledEventAttributes.Header != nil { res += GetSizeOfMapStringToByteArray(event.ActivityTaskScheduledEventAttributes.Header.Fields) } case types.EventTypeActivityTaskStarted: res += len(event.ActivityTaskStartedEventAttributes.LastFailureDetails) case types.EventTypeActivityTaskCompleted: res += len(event.ActivityTaskCompletedEventAttributes.Result) case types.EventTypeActivityTaskFailed: res += len(event.ActivityTaskFailedEventAttributes.Details) case types.EventTypeActivityTaskTimedOut: res += len(event.ActivityTaskTimedOutEventAttributes.Details) res += len(event.ActivityTaskTimedOutEventAttributes.LastFailureDetails) case types.EventTypeActivityTaskCancelRequested: case types.EventTypeRequestCancelActivityTaskFailed: case types.EventTypeActivityTaskCanceled: res += len(event.ActivityTaskCanceledEventAttributes.Details) case types.EventTypeTimerStarted: case types.EventTypeTimerFired: case types.EventTypeCancelTimerFailed: case types.EventTypeTimerCanceled: case types.EventTypeWorkflowExecutionCancelRequested: case types.EventTypeWorkflowExecutionCanceled: res += len(event.WorkflowExecutionCanceledEventAttributes.Details) case types.EventTypeRequestCancelExternalWorkflowExecutionInitiated: res += len(event.RequestCancelExternalWorkflowExecutionInitiatedEventAttributes.Control) case types.EventTypeRequestCancelExternalWorkflowExecutionFailed: res += len(event.RequestCancelExternalWorkflowExecutionFailedEventAttributes.Control) case types.EventTypeExternalWorkflowExecutionCancelRequested: case types.EventTypeMarkerRecorded: res += len(event.MarkerRecordedEventAttributes.Details) case types.EventTypeWorkflowExecutionSignaled: res += len(event.WorkflowExecutionSignaledEventAttributes.Input) case types.EventTypeWorkflowExecutionTerminated: res += len(event.WorkflowExecutionTerminatedEventAttributes.Details) case types.EventTypeWorkflowExecutionContinuedAsNew: res += len(event.WorkflowExecutionContinuedAsNewEventAttributes.Input) if event.WorkflowExecutionContinuedAsNewEventAttributes.Memo != nil { res += GetSizeOfMapStringToByteArray(event.WorkflowExecutionContinuedAsNewEventAttributes.Memo.Fields) } if event.WorkflowExecutionContinuedAsNewEventAttributes.Header != nil { res += GetSizeOfMapStringToByteArray(event.WorkflowExecutionContinuedAsNewEventAttributes.Header.Fields) } if event.WorkflowExecutionContinuedAsNewEventAttributes.SearchAttributes != nil { res += GetSizeOfMapStringToByteArray(event.WorkflowExecutionContinuedAsNewEventAttributes.SearchAttributes.IndexedFields) } case types.EventTypeStartChildWorkflowExecutionInitiated: res += len(event.StartChildWorkflowExecutionInitiatedEventAttributes.Input) res += len(event.StartChildWorkflowExecutionInitiatedEventAttributes.Control) if event.StartChildWorkflowExecutionInitiatedEventAttributes.Memo != nil { res += GetSizeOfMapStringToByteArray(event.StartChildWorkflowExecutionInitiatedEventAttributes.Memo.Fields) } if event.StartChildWorkflowExecutionInitiatedEventAttributes.Header != nil { res += GetSizeOfMapStringToByteArray(event.StartChildWorkflowExecutionInitiatedEventAttributes.Header.Fields) } if event.StartChildWorkflowExecutionInitiatedEventAttributes.SearchAttributes != nil { res += GetSizeOfMapStringToByteArray(event.StartChildWorkflowExecutionInitiatedEventAttributes.SearchAttributes.IndexedFields) } case types.EventTypeStartChildWorkflowExecutionFailed: res += len(event.StartChildWorkflowExecutionFailedEventAttributes.Control) case types.EventTypeChildWorkflowExecutionStarted: if event.ChildWorkflowExecutionStartedEventAttributes != nil && event.ChildWorkflowExecutionStartedEventAttributes.Header != nil { res += GetSizeOfMapStringToByteArray(event.ChildWorkflowExecutionStartedEventAttributes.Header.Fields) } case types.EventTypeChildWorkflowExecutionCompleted: res += len(event.ChildWorkflowExecutionCompletedEventAttributes.Result) case types.EventTypeChildWorkflowExecutionFailed: res += len(event.ChildWorkflowExecutionFailedEventAttributes.Details) case types.EventTypeChildWorkflowExecutionCanceled: res += len(event.ChildWorkflowExecutionCanceledEventAttributes.Details) case types.EventTypeChildWorkflowExecutionTimedOut: case types.EventTypeChildWorkflowExecutionTerminated: case types.EventTypeSignalExternalWorkflowExecutionInitiated: res += len(event.SignalExternalWorkflowExecutionInitiatedEventAttributes.Input) res += len(event.SignalExternalWorkflowExecutionInitiatedEventAttributes.Control) case types.EventTypeSignalExternalWorkflowExecutionFailed: res += len(event.SignalExternalWorkflowExecutionFailedEventAttributes.Control) case types.EventTypeExternalWorkflowExecutionSignaled: res += len(event.ExternalWorkflowExecutionSignaledEventAttributes.Control) case types.EventTypeUpsertWorkflowSearchAttributes: if event.UpsertWorkflowSearchAttributesEventAttributes.SearchAttributes != nil { res += GetSizeOfMapStringToByteArray(event.UpsertWorkflowSearchAttributesEventAttributes.SearchAttributes.IndexedFields) } } return uint64(res) } // IsJustOrderByClause return true is query start with order by func IsJustOrderByClause(clause string) bool { whereClause := strings.TrimSpace(clause) whereClause = strings.ToLower(whereClause) return strings.HasPrefix(whereClause, "order by") } // ConvertIndexedValueTypeToInternalType takes fieldType as interface{} and convert to IndexedValueType. // Because different implementation of dynamic config client may lead to different types func ConvertIndexedValueTypeToInternalType(fieldType interface{}, logger log.Logger) types.IndexedValueType { switch t := fieldType.(type) { case float64: return types.IndexedValueType(t) case int: return types.IndexedValueType(t) case types.IndexedValueType: return t case []byte: var result types.IndexedValueType if err := result.UnmarshalText(t); err != nil { logger.Error("unknown index value type", tag.Value(fieldType), tag.ValueType(t), tag.Error(err)) return fieldType.(types.IndexedValueType) // it will panic and been captured by logger } return result case string: var result types.IndexedValueType if err := result.UnmarshalText([]byte(t)); err != nil { logger.Error("unknown index value type", tag.Value(fieldType), tag.ValueType(t), tag.Error(err)) return fieldType.(types.IndexedValueType) // it will panic and been captured by logger } return result default: // Unknown fieldType, please make sure dynamic config return correct value type logger.Error("unknown index value type", tag.Value(fieldType), tag.ValueType(t)) return fieldType.(types.IndexedValueType) // it will panic and been captured by logger } } // DeserializeSearchAttributeValue takes json encoded search attribute value and it's type as input, then // unmarshal the value into a concrete type and return the value func DeserializeSearchAttributeValue(value []byte, valueType types.IndexedValueType) (interface{}, error) { switch valueType { case types.IndexedValueTypeString, types.IndexedValueTypeKeyword: var val string if err := json.Unmarshal(value, &val); err != nil { var listVal []string err = json.Unmarshal(value, &listVal) return listVal, err } return val, nil case types.IndexedValueTypeInt: var val int64 if err := json.Unmarshal(value, &val); err != nil { var listVal []int64 err = json.Unmarshal(value, &listVal) return listVal, err } return val, nil case types.IndexedValueTypeDouble: var val float64 if err := json.Unmarshal(value, &val); err != nil { var listVal []float64 err = json.Unmarshal(value, &listVal) return listVal, err } return val, nil case types.IndexedValueTypeBool: var val bool if err := json.Unmarshal(value, &val); err != nil { var listVal []bool err = json.Unmarshal(value, &listVal) return listVal, err } return val, nil case types.IndexedValueTypeDatetime: var val time.Time if err := json.Unmarshal(value, &val); err != nil { var listVal []time.Time err = json.Unmarshal(value, &listVal) return listVal, err } return val, nil default: return nil, fmt.Errorf("error: unknown index value type [%v]", valueType) } } // IsAdvancedVisibilityWritingEnabled returns true if we should write to advanced visibility func IsAdvancedVisibilityWritingEnabled(advancedVisibilityWritingMode string, isAdvancedVisConfigExist bool) bool { return advancedVisibilityWritingMode != AdvancedVisibilityWritingModeOff && isAdvancedVisConfigExist } // IsAdvancedVisibilityReadingEnabled returns true if we should read from advanced visibility func IsAdvancedVisibilityReadingEnabled(isAdvancedVisReadEnabled, isAdvancedVisConfigExist bool) bool { return isAdvancedVisReadEnabled && isAdvancedVisConfigExist } // ConvertIntMapToDynamicConfigMapProperty converts a map whose key value type are both int to // a map value that is compatible with dynamic config's map property func ConvertIntMapToDynamicConfigMapProperty( intMap map[int]int, ) map[string]interface{} { dcValue := make(map[string]interface{}) for key, value := range intMap { dcValue[strconv.Itoa(key)] = value } return dcValue } // ConvertDynamicConfigMapPropertyToIntMap convert a map property from dynamic config to a map // whose type for both key and value are int func ConvertDynamicConfigMapPropertyToIntMap(dcValue map[string]interface{}) (map[int]int, error) { intMap := make(map[int]int) for key, value := range dcValue { intKey, err := strconv.Atoi(strings.TrimSpace(key)) if err != nil { return nil, fmt.Errorf("failed to convert key %v, error: %v", key, err) } var intValue int switch value := value.(type) { case float64: intValue = int(value) case int: intValue = value case int32: intValue = int(value) case int64: intValue = int(value) default: return nil, fmt.Errorf("unknown value %v with type %T", value, value) } intMap[intKey] = intValue } return intMap, nil } // IsStickyTaskConditionError is error from matching engine func IsStickyTaskConditionError(err error) bool { if e, ok := err.(*types.InternalServiceError); ok { return e.GetMessage() == StickyTaskConditionFailedErrorMsg } return false } // DurationToDays converts time.Duration to number of 24 hour days func DurationToDays(d time.Duration) int32 { return int32(d / (24 * time.Hour)) } // DurationToSeconds converts time.Duration to number of seconds func DurationToSeconds(d time.Duration) int64 { return int64(d / time.Second) } // DaysToDuration converts number of 24 hour days to time.Duration func DaysToDuration(d int32) time.Duration { return time.Duration(d) * (24 * time.Hour) } // SecondsToDuration converts number of seconds to time.Duration func SecondsToDuration(d int64) time.Duration { return time.Duration(d) * time.Second } // SleepWithMinDuration sleeps for the minimum of desired and available duration // returns the remaining available time duration func SleepWithMinDuration(desired time.Duration, available time.Duration) time.Duration { d := MinDuration(desired, available) if d > 0 { time.Sleep(d) } return available - d } // ConvertErrToGetTaskFailedCause converts error to GetTaskFailedCause func ConvertErrToGetTaskFailedCause(err error) types.GetTaskFailedCause { if IsContextTimeoutError(err) { return types.GetTaskFailedCauseTimeout } if IsServiceBusyError(err) { return types.GetTaskFailedCauseServiceBusy } if _, ok := err.(*types.ShardOwnershipLostError); ok { return types.GetTaskFailedCauseShardOwnershipLost } return types.GetTaskFailedCauseUncategorized } // ConvertGetTaskFailedCauseToErr converts GetTaskFailedCause to error func ConvertGetTaskFailedCauseToErr(failedCause types.GetTaskFailedCause) error { switch failedCause { case types.GetTaskFailedCauseServiceBusy: return &types.ServiceBusyError{} case types.GetTaskFailedCauseTimeout: return context.DeadlineExceeded case types.GetTaskFailedCauseShardOwnershipLost: return &types.ShardOwnershipLostError{} default: return &types.InternalServiceError{Message: "uncategorized error"} } } // GetTaskPriority returns priority given a task's priority class and subclass func GetTaskPriority( class int, subClass int, ) int { return class | subClass } // IntersectionStringSlice get the intersection of 2 string slices func IntersectionStringSlice(a, b []string) []string { var result []string m := make(map[string]struct{}) for _, item := range a { m[item] = struct{}{} } for _, item := range b { if _, ok := m[item]; ok { result = append(result, item) } } return result }