common/persistence/wrappers/errorinjectors/utils.go (304 lines of code) (raw):

// The MIT License (MIT) // Copyright (c) 2017-2020 Uber Technologies Inc. // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in all // copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE // SOFTWARE. package errorinjectors import ( "fmt" "math/rand" "strings" "github.com/uber/cadence/common/errors" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/persistence" ) // _randomStubFunc is a stub randomized function that could be overriden in tests. // It introduces randomness to timeout and unhandled errors, to mimic retriable db isues. var _randomStubFunc = func() bool { // forward the call with 50% chance return rand.Intn(2) == 0 } func shouldForwardCallToPersistence( err error, ) bool { if err == nil { return true } if err == ErrFakeTimeout || err == errors.ErrFakeUnhandled { return _randomStubFunc() } return false } func generateFakeError( errorRate float64, ) error { randFl := rand.Float64() if randFl < errorRate { return fakeErrors[rand.Intn(len(fakeErrors))] } return nil } var ( // ErrFakeTimeout is a fake persistence timeout error. ErrFakeTimeout = &persistence.TimeoutError{Msg: "Fake Persistence Timeout Error."} ) var ( fakeErrors = []error{ errors.ErrFakeServiceBusy, errors.ErrFakeInternalService, ErrFakeTimeout, errors.ErrFakeUnhandled, } ) func isFakeError(err error) bool { for _, fakeErr := range fakeErrors { if err == fakeErr { return true } } return false } const ( msgInjectedFakeErr = "Injected fake persistence error" ) func logErr(logger log.Logger, objectMethod string, fakeErr error, forwardCall bool, err error) { logger.Error(msgInjectedFakeErr, getOperationFromMethodName(objectMethod), tag.Error(fakeErr), tag.Bool(forwardCall), tag.StoreError(err), ) } func getOperationFromMethodName(op string) tag.Tag { var t *tag.Tag switch { case strings.HasPrefix(op, "ConfigStoreManager"): t = configManagerTags(op) case strings.HasPrefix(op, "DomainManager"): t = domainManagerTags(op) case strings.HasPrefix(op, "HistoryManager"): t = historyManagerTags(op) case strings.HasPrefix(op, "ShardManager"): t = shardManagerTags(op) case strings.HasPrefix(op, "ExecutionManager"): t = executionManagerTags(op) case strings.HasPrefix(op, "VisibilityManager"): t = visibilityManagerTags(op) case strings.HasPrefix(op, "QueueManager"): t = queueManagerTags(op) case strings.HasPrefix(op, "TaskManager"): t = taskManagerTags(op) } if t == nil { panic(fmt.Sprintf("No tag defined for operation %s", op)) } return *t } func configManagerTags(op string) *tag.Tag { switch op { case "ConfigStoreManager.FetchDynamicConfig": return &tag.StoreOperationFetchDynamicConfig case "ConfigStoreManager.UpdateDynamicConfig": return &tag.StoreOperationUpdateDynamicConfig } return nil } func domainManagerTags(op string) *tag.Tag { switch op { case "DomainManager.CreateDomain": return &tag.StoreOperationCreateDomain case "DomainManager.GetDomain": return &tag.StoreOperationGetDomain case "DomainManager.DeleteDomain": return &tag.StoreOperationDeleteDomain case "DomainManager.DeleteDomainByName": return &tag.StoreOperationDeleteDomainByName case "DomainManager.ListDomains": return &tag.StoreOperationListDomains case "DomainManager.GetMetadata": return &tag.StoreOperationGetMetadata case "DomainManager.UpdateDomain": return &tag.StoreOperationUpdateDomain } return nil } func historyManagerTags(op string) *tag.Tag { switch op { case "HistoryManager.AppendHistoryNodes": return &tag.StoreOperationAppendHistoryNodes case "HistoryManager.DeleteHistoryBranch": return &tag.StoreOperationDeleteHistoryBranch case "HistoryManager.ForkHistoryBranch": return &tag.StoreOperationForkHistoryBranch case "HistoryManager.GetHistoryTree": return &tag.StoreOperationGetHistoryTree case "HistoryManager.ReadHistoryBranch": return &tag.StoreOperationReadHistoryBranch case "HistoryManager.ReadHistoryBranchByBatch": return &tag.StoreOperationReadHistoryBranchByBatch case "HistoryManager.ReadRawHistoryBranch": return &tag.StoreOperationReadRawHistoryBranch case "HistoryManager.GetAllHistoryTreeBranches": return &tag.StoreOperationGetAllHistoryTreeBranches } return nil } func shardManagerTags(op string) *tag.Tag { switch op { case "ShardManager.CreateShard": return &tag.StoreOperationCreateShard case "ShardManager.GetShard": return &tag.StoreOperationGetShard case "ShardManager.UpdateShard": return &tag.StoreOperationUpdateShard } return nil } func executionManagerTags(op string) *tag.Tag { switch op { case "ExecutionManager.CreateWorkflowExecution": return &tag.StoreOperationCreateWorkflowExecution case "ExecutionManager.GetWorkflowExecution": return &tag.StoreOperationGetWorkflowExecution case "ExecutionManager.UpdateWorkflowExecution": return &tag.StoreOperationUpdateWorkflowExecution case "ExecutionManager.ResetWorkflowExecution": return &tag.StoreOperationResetWorkflowExecution case "ExecutionManager.DeleteWorkflowExecution": return &tag.StoreOperationDeleteWorkflowExecution case "ExecutionManager.GetCurrentExecution": return &tag.StoreOperationGetCurrentExecution case "ExecutionManager.ConflictResolveWorkflowExecution": return &tag.StoreOperationConflictResolveWorkflowExecution case "ExecutionManager.DeleteCurrentWorkflowExecution": return &tag.StoreOperationDeleteCurrentWorkflowExecution case "ExecutionManager.ListCurrentExecutions": return &tag.StoreOperationListCurrentExecution case "ExecutionManager.IsWorkflowExecutionExists": return &tag.StoreOperationIsWorkflowExecutionExists case "ExecutionManager.ListConcreteExecutions": return &tag.StoreOperationListConcreteExecution case "ExecutionManager.GetTransferTasks": return &tag.StoreOperationGetTransferTasks case "ExecutionManager.GetCrossClusterTasks": return &tag.StoreOperationGetCrossClusterTasks case "ExecutionManager.GetReplicationTasks": return &tag.StoreOperationGetReplicationTasks case "ExecutionManager.CompleteTransferTask": return &tag.StoreOperationCompleteTransferTask case "ExecutionManager.RangeCompleteTransferTask": return &tag.StoreOperationRangeCompleteTransferTask case "ExecutionManager.CompleteCrossClusterTask": return &tag.StoreOperationCompleteCrossClusterTask case "ExecutionManager.RangeCompleteCrossClusterTask": return &tag.StoreOperationRangeCompleteCrossClusterTask case "ExecutionManager.CompleteReplicationTask": return &tag.StoreOperationCompleteReplicationTask case "ExecutionManager.RangeCompleteReplicationTask": return &tag.StoreOperationRangeCompleteReplicationTask case "ExecutionManager.PutReplicationTaskToDLQ": return &tag.StoreOperationPutReplicationTaskToDLQ case "ExecutionManager.GetReplicationTasksFromDLQ": return &tag.StoreOperationGetReplicationTasksFromDLQ case "ExecutionManager.GetReplicationDLQSize": return &tag.StoreOperationGetReplicationDLQSize case "ExecutionManager.DeleteReplicationTaskFromDLQ": return &tag.StoreOperationDeleteReplicationTaskFromDLQ case "ExecutionManager.RangeDeleteReplicationTaskFromDLQ": return &tag.StoreOperationRangeDeleteReplicationTaskFromDLQ case "ExecutionManager.GetTimerIndexTasks": return &tag.StoreOperationGetTimerIndexTasks case "ExecutionManager.CompleteTimerTask": return &tag.StoreOperationCompleteTimerTask case "ExecutionManager.RangeCompleteTimerTask": return &tag.StoreOperationRangeCompleteTimerTask case "ExecutionManager.CreateFailoverMarkerTasks": return &tag.StoreOperationCreateFailoverMarkerTasks } return nil } func visibilityManagerTags(op string) *tag.Tag { switch op { case "VisibilityManager.RecordWorkflowExecutionStarted": return &tag.StoreOperationRecordWorkflowExecutionStarted case "VisibilityManager.RecordWorkflowExecutionClosed": return &tag.StoreOperationRecordWorkflowExecutionClosed case "VisibilityManager.UpsertWorkflowExecution": return &tag.StoreOperationUpsertWorkflowExecution case "VisibilityManager.ListOpenWorkflowExecutions": return &tag.StoreOperationListOpenWorkflowExecutions case "VisibilityManager.ListClosedWorkflowExecutions": return &tag.StoreOperationListClosedWorkflowExecutions case "VisibilityManager.ListOpenWorkflowExecutionsByType": return &tag.StoreOperationListOpenWorkflowExecutionsByType case "VisibilityManager.ListClosedWorkflowExecutionsByType": return &tag.StoreOperationListClosedWorkflowExecutionsByType case "VisibilityManager.ListOpenWorkflowExecutionsByWorkflowID": return &tag.StoreOperationListOpenWorkflowExecutionsByWorkflowID case "VisibilityManager.ListClosedWorkflowExecutionsByWorkflowID": return &tag.StoreOperationListClosedWorkflowExecutionsByWorkflowID case "VisibilityManager.ListClosedWorkflowExecutionsByStatus": return &tag.StoreOperationListClosedWorkflowExecutionsByStatus case "VisibilityManager.GetClosedWorkflowExecution": return &tag.StoreOperationGetClosedWorkflowExecution case "VisibilityManager.DeleteWorkflowExecution": return &tag.StoreOperationDeleteWorkflowExecution case "VisibilityManager.ListWorkflowExecutions": return &tag.StoreOperationListWorkflowExecutions case "VisibilityManager.ScanWorkflowExecutions": return &tag.StoreOperationScanWorkflowExecutions case "VisibilityManager.CountWorkflowExecutions": return &tag.StoreOperationCountWorkflowExecutions case "VisibilityManager.DeleteUninitializedWorkflowExecution": return &tag.StoreOperationDeleteUninitializedWorkflowExecution case "VisibilityManager.RecordWorkflowExecutionUninitialized": return &tag.StoreOperationRecordWorkflowExecutionUninitialized } return nil } func taskManagerTags(op string) *tag.Tag { switch op { case "TaskManager.LeaseTaskList": return &tag.StoreOperationLeaseTaskList case "TaskManager.UpdateTaskList": return &tag.StoreOperationUpdateTaskList case "TaskManager.CreateTasks": return &tag.StoreOperationCreateTasks case "TaskManager.GetTasks": return &tag.StoreOperationGetTasks case "TaskManager.CompleteTask": return &tag.StoreOperationCompleteTask case "TaskManager.CompleteTasksLessThan": return &tag.StoreOperationCompleteTasksLessThan case "TaskManager.DeleteTaskList": return &tag.StoreOperationDeleteTaskList case "TaskManager.GetOrphanTasks": return &tag.StoreOperationGetOrphanTasks case "TaskManager.GetTaskListSize": return &tag.StoreOperationGetTaskListSize case "TaskManager.ListTaskList": return &tag.StoreOperationListTaskList } return nil } func queueManagerTags(op string) *tag.Tag { switch op { case "QueueManager.EnqueueMessage": return &tag.StoreOperationEnqueueMessage case "QueueManager.EnqueueMessageToDLQ": return &tag.StoreOperationEnqueueMessageToDLQ case "QueueManager.DeleteMessageFromDLQ": return &tag.StoreOperationDeleteMessageFromDLQ case "QueueManager.RangeDeleteMessagesFromDLQ": return &tag.StoreOperationRangeDeleteMessagesFromDLQ case "QueueManager.UpdateAckLevel": return &tag.StoreOperationUpdateAckLevel case "QueueManager.GetAckLevels": return &tag.StoreOperationGetAckLevels case "QueueManager.UpdateDLQAckLevel": return &tag.StoreOperationUpdateDLQAckLevel case "QueueManager.GetDLQAckLevels": return &tag.StoreOperationGetDLQAckLevels case "QueueManager.GetDLQSize": return &tag.StoreOperationGetDLQSize case "QueueManager.DeleteMessagesBefore": return &tag.StoreOperationDeleteMessagesBefore case "QueueManager.ReadMessages": return &tag.StoreOperationReadMessages case "QueueManager.ReadMessagesFromDLQ": return &tag.StoreOperationReadMessagesFromDLQ } return nil }