service/frontend/wrappers/metered/metered.go (274 lines of code) (raw):
// The MIT License (MIT)
// Copyright (c) 2017-2020 Uber Technologies Inc.
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
package metered
import (
"context"
"errors"
"fmt"
"go.uber.org/yarpc/yarpcerrors"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/types"
)
func (h *apiHandler) handleErr(err error, scope metrics.Scope, logger log.Logger) error {
switch err := err.(type) {
case *types.InternalServiceError:
logger.Error("Internal service error", tag.Error(err))
scope.IncCounter(metrics.CadenceFailures)
return frontendInternalServiceError("cadence internal error, msg: %v", err.Message)
case *types.BadRequestError:
scope.IncCounter(metrics.CadenceErrBadRequestCounter)
return err
case *types.DomainNotActiveError:
scope.IncCounter(metrics.CadenceErrBadRequestCounter)
return err
case *types.ServiceBusyError:
scope.IncCounter(metrics.CadenceErrServiceBusyCounter)
return err
case *types.EntityNotExistsError:
scope.IncCounter(metrics.CadenceErrEntityNotExistsCounter)
return err
case *types.WorkflowExecutionAlreadyCompletedError:
scope.IncCounter(metrics.CadenceErrWorkflowExecutionAlreadyCompletedCounter)
return err
case *types.WorkflowExecutionAlreadyStartedError:
scope.IncCounter(metrics.CadenceErrExecutionAlreadyStartedCounter)
return err
case *types.DomainAlreadyExistsError:
scope.IncCounter(metrics.CadenceErrDomainAlreadyExistsCounter)
return err
case *types.CancellationAlreadyRequestedError:
scope.IncCounter(metrics.CadenceErrCancellationAlreadyRequestedCounter)
return err
case *types.QueryFailedError:
scope.IncCounter(metrics.CadenceErrQueryFailedCounter)
return err
case *types.LimitExceededError:
scope.IncCounter(metrics.CadenceErrLimitExceededCounter)
return err
case *types.ClientVersionNotSupportedError:
scope.IncCounter(metrics.CadenceErrClientVersionNotSupportedCounter)
return err
case *yarpcerrors.Status:
if err.Code() == yarpcerrors.CodeDeadlineExceeded {
logger.Error("Frontend request timedout", tag.Error(err))
scope.IncCounter(metrics.CadenceErrContextTimeoutCounter)
return err
}
}
if errors.Is(err, context.DeadlineExceeded) {
logger.Error("Frontend request timedout", tag.Error(err))
scope.IncCounter(metrics.CadenceErrContextTimeoutCounter)
return err
}
logger.Error("Uncategorized error", tag.Error(err))
scope.IncCounter(metrics.CadenceFailures)
return frontendInternalServiceError("cadence internal uncategorized error, msg: %v", err.Error())
}
func (h *apiHandler) withSignalName(
ctx context.Context,
domainName string,
signalName string,
) context.Context {
if h.cfg.EmitSignalNameMetricsTag(domainName) {
return metrics.TagContext(ctx, metrics.SignalNameTag(signalName))
}
return ctx
}
func frontendInternalServiceError(fmtStr string, args ...interface{}) error {
// NOTE: For internal error, we can't return thrift error from cadence-frontend.
// Because in uber internal metrics, thrift errors are counted as user errors.
return fmt.Errorf(fmtStr, args...)
}
func toCountWorkflowExecutionsRequestTags(req *types.CountWorkflowExecutionsRequest) []tag.Tag {
return []tag.Tag{
tag.WorkflowDomainName(req.GetDomain()),
}
}
func toDescribeTaskListRequestTags(req *types.DescribeTaskListRequest) []tag.Tag {
return []tag.Tag{
tag.WorkflowDomainName(req.GetDomain()),
tag.WorkflowTaskListName(req.GetTaskList().GetName()),
tag.WorkflowTaskListType(int(req.GetTaskListType())),
tag.WorkflowTaskListKind(int32(req.GetTaskList().GetKind())),
}
}
func toDescribeWorkflowExecutionRequestTags(req *types.DescribeWorkflowExecutionRequest) []tag.Tag {
return []tag.Tag{
tag.WorkflowDomainName(req.GetDomain()),
tag.WorkflowID(req.GetExecution().GetWorkflowID()),
tag.WorkflowRunID(req.GetExecution().GetRunID()),
}
}
func toGetTaskListsByDomainRequestTags(req *types.GetTaskListsByDomainRequest) []tag.Tag {
return []tag.Tag{
tag.WorkflowDomainName(req.GetDomain()),
}
}
func toGetWorkflowExecutionHistoryRequestTags(req *types.GetWorkflowExecutionHistoryRequest) []tag.Tag {
return []tag.Tag{
tag.WorkflowDomainName(req.GetDomain()),
tag.WorkflowID(req.GetExecution().GetWorkflowID()),
tag.WorkflowRunID(req.GetExecution().GetRunID()),
}
}
func toListArchivedWorkflowExecutionsRequestTags(req *types.ListArchivedWorkflowExecutionsRequest) []tag.Tag {
return []tag.Tag{
tag.WorkflowDomainName(req.GetDomain()),
}
}
func toListClosedWorkflowExecutionsRequestTags(req *types.ListClosedWorkflowExecutionsRequest) []tag.Tag {
return []tag.Tag{
tag.WorkflowDomainName(req.GetDomain()),
}
}
func toListOpenWorkflowExecutionsRequestTags(req *types.ListOpenWorkflowExecutionsRequest) []tag.Tag {
return []tag.Tag{
tag.WorkflowDomainName(req.GetDomain()),
}
}
func toListTaskListPartitionsRequestTags(req *types.ListTaskListPartitionsRequest) []tag.Tag {
return []tag.Tag{
tag.WorkflowDomainName(req.GetDomain()),
tag.WorkflowTaskListName(req.GetTaskList().GetName()),
tag.WorkflowTaskListKind(int32(req.GetTaskList().GetKind())),
}
}
func toListWorkflowExecutionsRequestTags(req *types.ListWorkflowExecutionsRequest) []tag.Tag {
return []tag.Tag{
tag.WorkflowDomainName(req.GetDomain()),
}
}
func toPollForActivityTaskRequestTags(req *types.PollForActivityTaskRequest) []tag.Tag {
return []tag.Tag{
tag.WorkflowDomainName(req.GetDomain()),
tag.WorkflowTaskListName(req.GetTaskList().GetName()),
tag.WorkflowTaskListKind(int32(req.GetTaskList().GetKind())),
}
}
func toPollForDecisionTaskRequestTags(req *types.PollForDecisionTaskRequest) []tag.Tag {
return []tag.Tag{
tag.WorkflowDomainName(req.GetDomain()),
tag.WorkflowTaskListName(req.GetTaskList().GetName()),
tag.WorkflowTaskListKind(int32(req.GetTaskList().GetKind())),
}
}
func toQueryWorkflowRequestTags(req *types.QueryWorkflowRequest) []tag.Tag {
return []tag.Tag{
tag.WorkflowDomainName(req.GetDomain()),
tag.WorkflowID(req.GetExecution().GetWorkflowID()),
tag.WorkflowRunID(req.GetExecution().GetRunID()),
}
}
func toRecordActivityTaskHeartbeatByIDRequestTags(req *types.RecordActivityTaskHeartbeatByIDRequest) []tag.Tag {
return []tag.Tag{
tag.WorkflowDomainName(req.GetDomain()),
tag.WorkflowID(req.GetWorkflowID()),
tag.WorkflowRunID(req.GetRunID()),
}
}
func toRefreshWorkflowTasksRequestTags(req *types.RefreshWorkflowTasksRequest) []tag.Tag {
return []tag.Tag{
tag.WorkflowDomainName(req.GetDomain()),
tag.WorkflowID(req.GetExecution().GetWorkflowID()),
tag.WorkflowRunID(req.GetExecution().GetRunID()),
}
}
func toRequestCancelWorkflowExecutionRequestTags(req *types.RequestCancelWorkflowExecutionRequest) []tag.Tag {
return []tag.Tag{
tag.WorkflowDomainName(req.GetDomain()),
tag.WorkflowID(req.GetWorkflowExecution().GetWorkflowID()),
tag.WorkflowRunID(req.GetWorkflowExecution().GetRunID()),
}
}
func toResetStickyTaskListRequestTags(req *types.ResetStickyTaskListRequest) []tag.Tag {
return []tag.Tag{
tag.WorkflowDomainName(req.GetDomain()),
tag.WorkflowID(req.GetExecution().GetWorkflowID()),
tag.WorkflowRunID(req.GetExecution().GetRunID()),
}
}
func toResetWorkflowExecutionRequestTags(req *types.ResetWorkflowExecutionRequest) []tag.Tag {
return []tag.Tag{
tag.WorkflowDomainName(req.GetDomain()),
tag.WorkflowID(req.GetWorkflowExecution().GetWorkflowID()),
tag.WorkflowRunID(req.GetWorkflowExecution().GetRunID()),
}
}
func toRespondActivityTaskCanceledByIDRequestTags(req *types.RespondActivityTaskCanceledByIDRequest) []tag.Tag {
return []tag.Tag{
tag.WorkflowDomainName(req.GetDomain()),
tag.WorkflowID(req.GetWorkflowID()),
tag.WorkflowRunID(req.GetRunID()),
}
}
func toRespondActivityTaskCompletedByIDRequestTags(req *types.RespondActivityTaskCompletedByIDRequest) []tag.Tag {
return []tag.Tag{
tag.WorkflowDomainName(req.GetDomain()),
tag.WorkflowID(req.GetWorkflowID()),
tag.WorkflowRunID(req.GetRunID()),
}
}
func toRespondActivityTaskFailedByIDRequestTags(req *types.RespondActivityTaskFailedByIDRequest) []tag.Tag {
return []tag.Tag{
tag.WorkflowDomainName(req.GetDomain()),
tag.WorkflowID(req.GetWorkflowID()),
tag.WorkflowRunID(req.GetRunID()),
}
}
func toRestartWorkflowExecutionRequestTags(req *types.RestartWorkflowExecutionRequest) []tag.Tag {
return []tag.Tag{
tag.WorkflowDomainName(req.GetDomain()),
tag.WorkflowID(req.GetWorkflowExecution().GetWorkflowID()),
tag.WorkflowRunID(req.GetWorkflowExecution().GetRunID()),
}
}
func toSignalWithStartWorkflowExecutionRequestTags(req *types.SignalWithStartWorkflowExecutionRequest) []tag.Tag {
return []tag.Tag{
tag.WorkflowDomainName(req.GetDomain()),
tag.WorkflowID(req.GetWorkflowID()),
tag.WorkflowType(req.WorkflowType.GetName()),
tag.WorkflowSignalName(req.GetSignalName()),
}
}
func toSignalWithStartWorkflowExecutionAsyncRequestTags(req *types.SignalWithStartWorkflowExecutionAsyncRequest) []tag.Tag {
return []tag.Tag{
tag.WorkflowDomainName(req.GetDomain()),
tag.WorkflowID(req.GetWorkflowID()),
tag.WorkflowType(req.WorkflowType.GetName()),
tag.WorkflowSignalName(req.GetSignalName()),
}
}
func toSignalWorkflowExecutionRequestTags(req *types.SignalWorkflowExecutionRequest) []tag.Tag {
return []tag.Tag{
tag.WorkflowDomainName(req.GetDomain()),
tag.WorkflowID(req.GetWorkflowExecution().GetWorkflowID()),
tag.WorkflowRunID(req.GetWorkflowExecution().GetRunID()),
tag.WorkflowSignalName(req.GetSignalName()),
}
}
func toStartWorkflowExecutionRequestTags(req *types.StartWorkflowExecutionRequest) []tag.Tag {
return []tag.Tag{
tag.WorkflowDomainName(req.GetDomain()),
tag.WorkflowID(req.GetWorkflowID()),
tag.WorkflowType(req.WorkflowType.GetName()),
tag.WorkflowCronSchedule(req.GetCronSchedule()),
}
}
func toStartWorkflowExecutionAsyncRequestTags(req *types.StartWorkflowExecutionAsyncRequest) []tag.Tag {
return []tag.Tag{
tag.WorkflowDomainName(req.GetDomain()),
tag.WorkflowID(req.GetWorkflowID()),
tag.WorkflowType(req.WorkflowType.GetName()),
tag.WorkflowCronSchedule(req.GetCronSchedule()),
}
}
func toTerminateWorkflowExecutionRequestTags(req *types.TerminateWorkflowExecutionRequest) []tag.Tag {
return []tag.Tag{
tag.WorkflowDomainName(req.GetDomain()),
tag.WorkflowID(req.GetWorkflowExecution().GetWorkflowID()),
tag.WorkflowRunID(req.GetWorkflowExecution().GetRunID()),
}
}
func toScanWorkflowExecutionsRequestTags(req *types.ListWorkflowExecutionsRequest) []tag.Tag {
return []tag.Tag{
tag.WorkflowDomainName(req.GetDomain()),
}
}