service/frontend/admin/handler.go (1,510 lines of code) (raw):

// Copyright (c) 2017 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package admin import ( "context" "encoding/json" "errors" "fmt" "math" "strconv" "time" "github.com/google/uuid" "github.com/uber/cadence/.gen/go/shared" "github.com/uber/cadence/common" "github.com/uber/cadence/common/asyncworkflow/queueconfigapi" "github.com/uber/cadence/common/backoff" "github.com/uber/cadence/common/client" "github.com/uber/cadence/common/codec" "github.com/uber/cadence/common/definition" "github.com/uber/cadence/common/domain" dc "github.com/uber/cadence/common/dynamicconfig" "github.com/uber/cadence/common/elasticsearch" "github.com/uber/cadence/common/isolationgroup/isolationgroupapi" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/ndc" "github.com/uber/cadence/common/persistence" "github.com/uber/cadence/common/resource" "github.com/uber/cadence/common/service" "github.com/uber/cadence/common/types" "github.com/uber/cadence/service/frontend/config" "github.com/uber/cadence/service/frontend/validate" "github.com/uber/cadence/service/history/execution" ) const ( getDomainReplicationMessageBatchSize = 100 defaultLastMessageID = int64(-1) endMessageID = int64(1<<63 - 1) ) type ( // adminHandlerImpl is an implementation for admin service independent of wire protocol adminHandlerImpl struct { resource.Resource numberOfHistoryShards int params *resource.Params config *config.Config domainDLQHandler domain.DLQMessageHandler domainFailoverWatcher domain.FailoverWatcher eventSerializer persistence.PayloadSerializer esClient elasticsearch.GenericClient throttleRetry *backoff.ThrottleRetry isolationGroups isolationgroupapi.Handler asyncWFQueueConfigs queueconfigapi.Handler } workflowQueryTemplate struct { name string function func(request *types.AdminMaintainWorkflowRequest) error } getWorkflowRawHistoryV2Token struct { DomainName string WorkflowID string RunID string StartEventID int64 StartEventVersion int64 EndEventID int64 EndEventVersion int64 PersistenceToken []byte VersionHistories *types.VersionHistories } ) var ( adminServiceRetryPolicy = common.CreateAdminServiceRetryPolicy() corruptWorkflowErrorList = [3]string{ execution.ErrMissingWorkflowStartEvent.Error(), execution.ErrMissingActivityScheduledEvent.Error(), persistence.ErrCorruptedHistory.Error(), } ) // NewHandler creates a thrift service for the cadence admin service func NewHandler( resource resource.Resource, params *resource.Params, config *config.Config, domainHandler domain.Handler, ) Handler { domainReplicationTaskExecutor := domain.NewReplicationTaskExecutor( resource.GetDomainManager(), resource.GetTimeSource(), resource.GetLogger(), ) return &adminHandlerImpl{ Resource: resource, numberOfHistoryShards: params.PersistenceConfig.NumHistoryShards, params: params, config: config, domainDLQHandler: domain.NewDLQMessageHandler( domainReplicationTaskExecutor, resource.GetDomainReplicationQueue(), resource.GetLogger(), resource.GetMetricsClient(), ), domainFailoverWatcher: domain.NewFailoverWatcher( resource.GetDomainCache(), resource.GetDomainManager(), resource.GetTimeSource(), config.DomainFailoverRefreshInterval, config.DomainFailoverRefreshTimerJitterCoefficient, resource.GetMetricsClient(), resource.GetLogger(), ), eventSerializer: persistence.NewPayloadSerializer(), esClient: params.ESClient, throttleRetry: backoff.NewThrottleRetry( backoff.WithRetryPolicy(adminServiceRetryPolicy), backoff.WithRetryableError(common.IsServiceTransientError), ), isolationGroups: isolationgroupapi.New(resource.GetLogger(), resource.GetIsolationGroupStore(), domainHandler), asyncWFQueueConfigs: queueconfigapi.New(resource.GetLogger(), domainHandler), } } // Start starts the handler func (adh *adminHandlerImpl) Start() { adh.domainDLQHandler.Start() if adh.config.EnableGracefulFailover() { adh.domainFailoverWatcher.Start() } } // Stop stops the handler func (adh *adminHandlerImpl) Stop() { adh.domainDLQHandler.Stop() adh.domainFailoverWatcher.Stop() } // AddSearchAttribute add search attribute to whitelist func (adh *adminHandlerImpl) AddSearchAttribute( ctx context.Context, request *types.AddSearchAttributeRequest, ) (retError error) { defer func() { log.CapturePanic(recover(), adh.GetLogger(), &retError) }() scope, sw := adh.startRequestProfile(ctx, metrics.AdminAddSearchAttributeScope) defer sw.Stop() // validate request if request == nil { return adh.error(validate.ErrRequestNotSet, scope) } if err := validate.CheckPermission(adh.config, request.SecurityToken); err != nil { return adh.error(validate.ErrNoPermission, scope) } if len(request.GetSearchAttribute()) == 0 { return adh.error(&types.BadRequestError{Message: "SearchAttributes are not provided"}, scope) } searchAttr := request.GetSearchAttribute() currentValidAttr, err := adh.params.DynamicConfig.GetMapValue(dc.ValidSearchAttributes, nil) if err != nil { return adh.error(&types.InternalServiceError{Message: fmt.Sprintf("Failed to get dynamic config, err: %v", err)}, scope) } for keyName, valueType := range searchAttr { if definition.IsSystemIndexedKey(keyName) { return adh.error(&types.BadRequestError{Message: fmt.Sprintf("Key [%s] is reserved by system", keyName)}, scope) } if currValType, exist := currentValidAttr[keyName]; exist { if currValType != int(valueType) { return adh.error(&types.BadRequestError{Message: fmt.Sprintf("Key [%s] is already whitelisted as a different type", keyName)}, scope) } adh.GetLogger().Warn("Adding a search attribute that is already existing in dynamicconfig, it's probably a noop if ElasticSearch is already added. Here will re-do it on ElasticSearch.") } currentValidAttr[keyName] = int(valueType) } // update dynamic config. Until the DB based dynamic config is implemented, we shouldn't fail the updating. err = adh.params.DynamicConfig.UpdateValue(dc.ValidSearchAttributes, currentValidAttr) if err != nil { adh.GetLogger().Warn("Failed to update dynamicconfig. This is only useful in local dev environment for filebased config. Please ignore this warn if this is in a real Cluster, because your filebased dynamicconfig MUST be updated separately. Configstore dynamic config will also require separate updating via the CLI.") } // when have valid advance visibility config, update elasticsearch mapping, new added field will not be able to remove or update if err := adh.validateConfigForAdvanceVisibility(); err != nil { adh.GetLogger().Warn("Skip updating OpenSearch/ElasticSearch mapping since Advance Visibility hasn't been enabled.") } else { index := adh.params.ESConfig.GetVisibilityIndex() for k, v := range searchAttr { valueType := convertIndexedValueTypeToESDataType(v) if len(valueType) == 0 { return adh.error(&types.BadRequestError{Message: fmt.Sprintf("Unknown value type, %v", v)}, scope) } err := adh.params.ESClient.PutMapping(ctx, index, definition.Attr, k, valueType) if adh.esClient.IsNotFoundError(err) { err = adh.params.ESClient.CreateIndex(ctx, index) if err != nil { return adh.error(&types.InternalServiceError{Message: fmt.Sprintf("Failed to create ES index, err: %v", err)}, scope) } err = adh.params.ESClient.PutMapping(ctx, index, definition.Attr, k, valueType) } if err != nil { return adh.error(&types.InternalServiceError{Message: fmt.Sprintf("Failed to update ES mapping, err: %v", err)}, scope) } } } return nil } // DescribeWorkflowExecution returns information about the specified workflow execution. func (adh *adminHandlerImpl) DescribeWorkflowExecution( ctx context.Context, request *types.AdminDescribeWorkflowExecutionRequest, ) (resp *types.AdminDescribeWorkflowExecutionResponse, retError error) { defer func() { log.CapturePanic(recover(), adh.GetLogger(), &retError) }() scope, sw := adh.startRequestProfile(ctx, metrics.AdminDescribeWorkflowExecutionScope) defer sw.Stop() if request == nil { return nil, adh.error(validate.ErrRequestNotSet, scope) } if err := validate.CheckExecution(request.Execution); err != nil { return nil, adh.error(err, scope) } shardID := common.WorkflowIDToHistoryShard(request.Execution.WorkflowID, adh.numberOfHistoryShards) shardIDstr := string(rune(shardID)) // originally `string(int_shard_id)`, but changing it will change the ring hashing shardIDForOutput := strconv.Itoa(shardID) historyHost, err := adh.GetMembershipResolver().Lookup(service.History, shardIDstr) if err != nil { return nil, adh.error(err, scope) } domainID, err := adh.GetDomainCache().GetDomainID(request.GetDomain()) if err != nil { return nil, adh.error(err, scope) } historyAddr := historyHost.GetAddress() resp2, err := adh.GetHistoryClient().DescribeMutableState(ctx, &types.DescribeMutableStateRequest{ DomainUUID: domainID, Execution: request.Execution, }) if err != nil { return &types.AdminDescribeWorkflowExecutionResponse{}, err } return &types.AdminDescribeWorkflowExecutionResponse{ ShardID: shardIDForOutput, HistoryAddr: historyAddr, MutableStateInDatabase: resp2.MutableStateInDatabase, MutableStateInCache: resp2.MutableStateInCache, }, err } // RemoveTask returns information about the internal states of a history host func (adh *adminHandlerImpl) RemoveTask( ctx context.Context, request *types.RemoveTaskRequest, ) (retError error) { defer func() { log.CapturePanic(recover(), adh.GetLogger(), &retError) }() scope, sw := adh.startRequestProfile(ctx, metrics.AdminRemoveTaskScope) defer sw.Stop() if request == nil || request.Type == nil { return adh.error(validate.ErrRequestNotSet, scope) } if err := adh.GetHistoryClient().RemoveTask(ctx, request); err != nil { return adh.error(err, scope) } return nil } func (adh *adminHandlerImpl) getCorruptWorkflowQueryTemplates( ctx context.Context, request *types.AdminMaintainWorkflowRequest, ) []workflowQueryTemplate { client := adh.GetFrontendClient() return []workflowQueryTemplate{ { name: "DescribeWorkflowExecution", function: func(request *types.AdminMaintainWorkflowRequest) error { _, err := client.DescribeWorkflowExecution(ctx, &types.DescribeWorkflowExecutionRequest{ Domain: request.Domain, Execution: request.Execution, }) return err }, }, { name: "GetWorkflowExecutionHistory", function: func(request *types.AdminMaintainWorkflowRequest) error { _, err := client.GetWorkflowExecutionHistory(ctx, &types.GetWorkflowExecutionHistoryRequest{ Domain: request.Domain, Execution: request.Execution, }) return err }, }, } } func (adh *adminHandlerImpl) MaintainCorruptWorkflow( ctx context.Context, request *types.AdminMaintainWorkflowRequest, ) (*types.AdminMaintainWorkflowResponse, error) { if request.GetExecution() == nil { return nil, types.BadRequestError{Message: "Execution is missing"} } logger := adh.GetLogger().WithTags( tag.WorkflowDomainName(request.Domain), tag.WorkflowID(request.GetExecution().GetWorkflowID()), tag.WorkflowRunID(request.GetExecution().GetRunID()), ) resp := &types.AdminMaintainWorkflowResponse{ HistoryDeleted: false, ExecutionsDeleted: false, VisibilityDeleted: false, } queryTemplates := adh.getCorruptWorkflowQueryTemplates(ctx, request) for _, template := range queryTemplates { functionName := template.name queryFunc := template.function err := queryFunc(request) if err == nil { logger.Info(fmt.Sprintf("Query succeeded for function: %s", functionName)) continue } if err != nil { logger.Info(fmt.Sprintf("%s returned error %#v", functionName, err)) } // check if the error message indicates corrupt workflow errorMessage := err.Error() for _, corruptMessage := range corruptWorkflowErrorList { if errorMessage == corruptMessage { logger.Info(fmt.Sprintf("Will delete workflow because (%v) returned corrupted error (%#v)", functionName, err)) resp, err = adh.DeleteWorkflow(ctx, request) return resp, nil } } } return resp, nil } func (adh *adminHandlerImpl) deleteWorkflowFromHistory( ctx context.Context, logger log.Logger, shardIDInt int, mutableState persistence.WorkflowMutableState, ) bool { historyManager := adh.GetHistoryManager() branchInfo := shared.HistoryBranch{} thriftrwEncoder := codec.NewThriftRWEncoder() branchTokens := [][]byte{mutableState.ExecutionInfo.BranchToken} if mutableState.VersionHistories != nil { // if VersionHistories is set, then all branch infos are stored in VersionHistories branchTokens = [][]byte{} for _, versionHistory := range mutableState.VersionHistories.ToInternalType().Histories { branchTokens = append(branchTokens, versionHistory.BranchToken) } } deletedFromHistory := len(branchTokens) == 0 failedToDeleteFromHistory := false for _, branchToken := range branchTokens { err := thriftrwEncoder.Decode(branchToken, &branchInfo) if err != nil { logger.Error("Cannot decode thrift object", tag.Error(err)) continue } domainName, err := adh.GetDomainCache().GetDomainName(mutableState.ExecutionInfo.DomainID) if err != nil { logger.Error("Unexpected: Cannot fetch domain name", tag.Error(err)) continue } logger.Info(fmt.Sprintf("Deleting history events for %#v", branchInfo)) err = historyManager.DeleteHistoryBranch(ctx, &persistence.DeleteHistoryBranchRequest{ BranchToken: branchToken, ShardID: &shardIDInt, DomainName: domainName, }) if err != nil { logger.Error("Failed to delete history", tag.Error(err)) failedToDeleteFromHistory = true } else { deletedFromHistory = true } } return deletedFromHistory && !failedToDeleteFromHistory } func (adh *adminHandlerImpl) deleteWorkflowFromExecutions( ctx context.Context, logger log.Logger, shardIDInt int, domainID string, workflowID string, runID string, scope metrics.Scope, ) bool { exeStore, err := adh.GetExecutionManager(shardIDInt) if err != nil { logger.Error(fmt.Sprintf("Cannot get execution manager for shardID(%v): %#v", shardIDInt, err)) return false } domainName, err := adh.GetDomainCache().GetDomainName(domainID) if err != nil { logger.Error("Unexpected: Cannot fetch domain name", tag.Error(err)) return false } req := &persistence.DeleteWorkflowExecutionRequest{ DomainID: domainID, WorkflowID: workflowID, RunID: runID, DomainName: domainName, } deletedFromExecutions := false err = exeStore.DeleteWorkflowExecution(ctx, req) if err != nil { logger.Error("Delete mutableState row failed", tag.Error(err)) } else { deletedFromExecutions = true } deleteCurrentReq := &persistence.DeleteCurrentWorkflowExecutionRequest{ DomainID: domainID, WorkflowID: workflowID, RunID: runID, DomainName: domainName, } err = exeStore.DeleteCurrentWorkflowExecution(ctx, deleteCurrentReq) if err != nil { logger.Error(fmt.Sprintf("Delete current row failed: %#v", err)) deletedFromExecutions = false } if deletedFromExecutions { logger.Info(fmt.Sprintf("Deleted executions row successfully %#v", deleteCurrentReq)) } return deletedFromExecutions } func (adh *adminHandlerImpl) deleteWorkflowFromVisibility( ctx context.Context, logger log.Logger, domainID string, domain string, workflowID string, runID string, ) bool { visibilityManager := adh.Resource.GetVisibilityManager() if visibilityManager == nil { logger.Info("No visibility manager found") return false } logger.Info("Deleting workflow from visibility store") key := persistence.VisibilityAdminDeletionKey("visibilityAdminDelete") visCtx := context.WithValue(ctx, key, true) err := visibilityManager.DeleteWorkflowExecution( visCtx, &persistence.VisibilityDeleteWorkflowExecutionRequest{ DomainID: domainID, Domain: domain, RunID: runID, WorkflowID: workflowID, TaskID: math.MaxInt64, }, ) if err != nil { logger.Error("Cannot delete visibility record", tag.Error(err)) } else { logger.Info("Deleted visibility record successfully") } return err == nil } // DeleteWorkflow delete a workflow execution for admin func (adh *adminHandlerImpl) DeleteWorkflow( ctx context.Context, request *types.AdminDeleteWorkflowRequest, ) (*types.AdminDeleteWorkflowResponse, error) { logger := adh.GetLogger() scope := adh.GetMetricsClient().Scope(metrics.AdminDeleteWorkflowScope).Tagged(metrics.GetContextTags(ctx)...) if request.GetExecution() == nil { logger.Info(fmt.Sprintf("Bad request: %#v", request)) return nil, adh.error(validate.ErrRequestNotSet, scope) } domainName := request.GetDomain() workflowID := request.GetExecution().GetWorkflowID() runID := request.GetExecution().GetRunID() skipErrors := request.GetSkipErrors() resp, err := adh.DescribeWorkflowExecution( ctx, &types.AdminDescribeWorkflowExecutionRequest{ Domain: domainName, Execution: &types.WorkflowExecution{ WorkflowID: workflowID, RunID: runID, }, }) if err != nil { logger.Error("Describe workflow failed", tag.Error(err)) if !skipErrors { return nil, adh.error(err, scope) } } msStr := resp.GetMutableStateInDatabase() ms := persistence.WorkflowMutableState{} err = json.Unmarshal([]byte(msStr), &ms) if err != nil { logger.Error(fmt.Sprintf("DeleteWorkflow failed: Cannot unmarshal mutableState: %#v", err)) return nil, adh.error(err, scope) } domainID := ms.ExecutionInfo.DomainID logger = logger.WithTags( tag.WorkflowDomainID(domainID), tag.WorkflowDomainName(domainName), tag.WorkflowID(workflowID), tag.WorkflowRunID(runID), ) shardID := resp.GetShardID() shardIDInt, err := strconv.Atoi(shardID) if err != nil { logger.Error(fmt.Sprintf("Cannot convert shardID(%v) to int: %#v", shardID, err)) return nil, adh.error(err, scope) } ctx, cancel := context.WithTimeout(ctx, 60*time.Second) defer cancel() deletedFromHistory := adh.deleteWorkflowFromHistory(ctx, logger, shardIDInt, ms) deletedFromExecutions := adh.deleteWorkflowFromExecutions(ctx, logger, shardIDInt, domainID, workflowID, runID, scope) deletedFromVisibility := false if deletedFromExecutions { // Without deleting the executions record, let's not delete the visibility record. // If we do that, workflow won't be visible but it will exist in the DB deletedFromVisibility = adh.deleteWorkflowFromVisibility(ctx, logger, domainID, domainName, workflowID, runID) } return &types.AdminDeleteWorkflowResponse{ HistoryDeleted: deletedFromHistory, ExecutionsDeleted: deletedFromExecutions, VisibilityDeleted: deletedFromVisibility, }, nil } // CloseShard returns information about the internal states of a history host func (adh *adminHandlerImpl) CloseShard( ctx context.Context, request *types.CloseShardRequest, ) (retError error) { defer func() { log.CapturePanic(recover(), adh.GetLogger(), &retError) }() scope, sw := adh.startRequestProfile(ctx, metrics.AdminCloseShardScope) defer sw.Stop() if request == nil { return adh.error(validate.ErrRequestNotSet, scope) } if err := adh.GetHistoryClient().CloseShard(ctx, request); err != nil { return adh.error(err, scope) } return nil } // ResetQueue resets processing queue states func (adh *adminHandlerImpl) ResetQueue( ctx context.Context, request *types.ResetQueueRequest, ) (retError error) { defer func() { log.CapturePanic(recover(), adh.GetLogger(), &retError) }() scope, sw := adh.startRequestProfile(ctx, metrics.AdminResetQueueScope) defer sw.Stop() if request == nil || request.Type == nil { return adh.error(validate.ErrRequestNotSet, scope) } if request.GetClusterName() == "" { return adh.error(validate.ErrClusterNameNotSet, scope) } if err := adh.GetHistoryClient().ResetQueue(ctx, request); err != nil { return adh.error(err, scope) } return nil } // DescribeQueue describes processing queue states func (adh *adminHandlerImpl) DescribeQueue( ctx context.Context, request *types.DescribeQueueRequest, ) (resp *types.DescribeQueueResponse, retError error) { defer func() { log.CapturePanic(recover(), adh.GetLogger(), &retError) }() scope, sw := adh.startRequestProfile(ctx, metrics.AdminDescribeQueueScope) defer sw.Stop() if request == nil || request.Type == nil { return nil, adh.error(validate.ErrRequestNotSet, scope) } if request.GetClusterName() == "" { return nil, adh.error(validate.ErrClusterNameNotSet, scope) } return adh.GetHistoryClient().DescribeQueue(ctx, request) } // DescribeShardDistribution returns information about history shard distribution func (adh *adminHandlerImpl) DescribeShardDistribution( ctx context.Context, request *types.DescribeShardDistributionRequest, ) (resp *types.DescribeShardDistributionResponse, retError error) { defer func() { log.CapturePanic(recover(), adh.GetLogger(), &retError) }() _, sw := adh.startRequestProfile(ctx, metrics.AdminDescribeShardDistributionScope) defer sw.Stop() numShards := adh.config.NumHistoryShards resp = &types.DescribeShardDistributionResponse{ NumberOfShards: int32(numShards), Shards: make(map[int32]string), } offset := int(request.PageID * request.PageSize) nextPageStart := offset + int(request.PageSize) for shardID := offset; shardID < numShards && shardID < nextPageStart; shardID++ { info, err := adh.GetMembershipResolver().Lookup(service.History, string(rune(shardID))) if err != nil { resp.Shards[int32(shardID)] = "unknown" } else { resp.Shards[int32(shardID)] = info.Identity() } } return resp, nil } // DescribeHistoryHost returns information about the internal states of a history host func (adh *adminHandlerImpl) DescribeHistoryHost( ctx context.Context, request *types.DescribeHistoryHostRequest, ) (resp *types.DescribeHistoryHostResponse, retError error) { defer func() { log.CapturePanic(recover(), adh.GetLogger(), &retError) }() scope, sw := adh.startRequestProfile(ctx, metrics.AdminDescribeHistoryHostScope) defer sw.Stop() if request == nil || (request.ShardIDForHost == nil && request.ExecutionForHost == nil && request.HostAddress == nil) { return nil, adh.error(validate.ErrRequestNotSet, scope) } if request.ExecutionForHost != nil { if err := validate.CheckExecution(request.ExecutionForHost); err != nil { return nil, adh.error(err, scope) } } return adh.GetHistoryClient().DescribeHistoryHost(ctx, request) } // GetWorkflowExecutionRawHistoryV2 - retrieves the history of workflow execution func (adh *adminHandlerImpl) GetWorkflowExecutionRawHistoryV2( ctx context.Context, request *types.GetWorkflowExecutionRawHistoryV2Request, ) (resp *types.GetWorkflowExecutionRawHistoryV2Response, retError error) { defer func() { log.CapturePanic(recover(), adh.GetLogger(), &retError) }() scope, sw := adh.startRequestProfile(ctx, metrics.AdminGetWorkflowExecutionRawHistoryV2Scope) defer sw.Stop() if err := adh.validateGetWorkflowExecutionRawHistoryV2Request( request, ); err != nil { return nil, adh.error(err, scope) } domainID, err := adh.GetDomainCache().GetDomainID(request.GetDomain()) if err != nil { return nil, adh.error(err, scope) } scope = scope.Tagged(metrics.DomainTag(request.GetDomain())) execution := request.Execution var pageToken *getWorkflowRawHistoryV2Token var targetVersionHistory *persistence.VersionHistory if request.NextPageToken == nil { response, err := adh.GetHistoryClient().GetMutableState(ctx, &types.GetMutableStateRequest{ DomainUUID: domainID, Execution: execution, }) if err != nil { return nil, adh.error(err, scope) } versionHistories := persistence.NewVersionHistoriesFromInternalType( response.GetVersionHistories(), ) targetVersionHistory, err = adh.setRequestDefaultValueAndGetTargetVersionHistory( request, versionHistories, ) if err != nil { return nil, adh.error(err, scope) } pageToken = adh.generatePaginationToken(request, versionHistories) } else { pageToken, err = deserializeRawHistoryToken(request.NextPageToken) if err != nil { return nil, adh.error(err, scope) } versionHistories := pageToken.VersionHistories if versionHistories == nil { return nil, adh.error(&types.BadRequestError{Message: "Invalid version histories."}, scope) } targetVersionHistory, err = adh.setRequestDefaultValueAndGetTargetVersionHistory( request, persistence.NewVersionHistoriesFromInternalType(versionHistories), ) if err != nil { return nil, adh.error(err, scope) } } if err := adh.validatePaginationToken( request, pageToken, ); err != nil { return nil, adh.error(err, scope) } if pageToken.StartEventID+1 == pageToken.EndEventID { // API is exclusive-exclusive. Return empty response here. return &types.GetWorkflowExecutionRawHistoryV2Response{ HistoryBatches: []*types.DataBlob{}, NextPageToken: nil, // no further pagination VersionHistory: targetVersionHistory.ToInternalType(), }, nil } pageSize := int(request.GetMaximumPageSize()) shardID := common.WorkflowIDToHistoryShard( execution.GetWorkflowID(), adh.numberOfHistoryShards, ) rawHistoryResponse, err := adh.GetHistoryManager().ReadRawHistoryBranch(ctx, &persistence.ReadHistoryBranchRequest{ BranchToken: targetVersionHistory.GetBranchToken(), // GetWorkflowExecutionRawHistoryV2 is exclusive exclusive. // ReadRawHistoryBranch is inclusive exclusive. MinEventID: pageToken.StartEventID + 1, MaxEventID: pageToken.EndEventID, PageSize: pageSize, NextPageToken: pageToken.PersistenceToken, ShardID: common.IntPtr(shardID), DomainName: request.GetDomain(), }) if err != nil { if _, ok := err.(*types.EntityNotExistsError); ok { // when no events can be returned from DB, DB layer will return // EntityNotExistsError, this API shall return empty response return &types.GetWorkflowExecutionRawHistoryV2Response{ HistoryBatches: []*types.DataBlob{}, NextPageToken: nil, // no further pagination VersionHistory: targetVersionHistory.ToInternalType(), }, nil } return nil, err } pageToken.PersistenceToken = rawHistoryResponse.NextPageToken size := rawHistoryResponse.Size scope.RecordTimer(metrics.HistorySize, time.Duration(size)) rawBlobs := rawHistoryResponse.HistoryEventBlobs blobs := []*types.DataBlob{} for _, blob := range rawBlobs { blobs = append(blobs, blob.ToInternal()) } result := &types.GetWorkflowExecutionRawHistoryV2Response{ HistoryBatches: blobs, VersionHistory: targetVersionHistory.ToInternalType(), } if len(pageToken.PersistenceToken) == 0 { result.NextPageToken = nil } else { result.NextPageToken, err = serializeRawHistoryToken(pageToken) if err != nil { return nil, err } } return result, nil } // DescribeCluster return information about cadence deployment func (adh *adminHandlerImpl) DescribeCluster( ctx context.Context, ) (resp *types.DescribeClusterResponse, retError error) { defer func() { log.CapturePanic(recover(), adh.GetLogger(), &retError) }() scope, sw := adh.startRequestProfile(ctx, metrics.AdminDescribeClusterScope) defer sw.Stop() // expose visibility store backend and if advanced options are available ave := types.PersistenceFeature{ Key: "advancedVisibilityEnabled", Enabled: adh.params.ESConfig != nil, } visibilityStoreInfo := types.PersistenceInfo{ Backend: adh.Resource.GetVisibilityManager().GetName(), Features: []*types.PersistenceFeature{&ave}, } // expose history store backend historyStoreInfo := types.PersistenceInfo{ Backend: adh.GetHistoryManager().GetName(), } membershipInfo := types.MembershipInfo{} if monitor := adh.GetMembershipResolver(); monitor != nil { currentHost, err := monitor.WhoAmI() if err != nil { return nil, adh.error(err, scope) } membershipInfo.CurrentHost = &types.HostInfo{ Identity: currentHost.Identity(), } var rings []*types.RingInfo for _, role := range service.List { var servers []*types.HostInfo members, err := monitor.Members(role) if err != nil { return nil, adh.error(err, scope) } for _, server := range members { servers = append(servers, &types.HostInfo{ Identity: server.Identity(), }) membershipInfo.ReachableMembers = append(membershipInfo.ReachableMembers, server.Identity()) } rings = append(rings, &types.RingInfo{ Role: role, MemberCount: int32(len(servers)), Members: servers, }) } membershipInfo.Rings = rings } return &types.DescribeClusterResponse{ SupportedClientVersions: &types.SupportedClientVersions{ GoSdk: client.SupportedGoSDKVersion, JavaSdk: client.SupportedJavaSDKVersion, }, MembershipInfo: &membershipInfo, PersistenceInfo: map[string]*types.PersistenceInfo{ "visibilityStore": &visibilityStoreInfo, "historyStore": &historyStoreInfo, }, }, nil } // GetReplicationMessages returns new replication tasks since the read level provided in the token. func (adh *adminHandlerImpl) GetReplicationMessages( ctx context.Context, request *types.GetReplicationMessagesRequest, ) (resp *types.GetReplicationMessagesResponse, err error) { defer func() { log.CapturePanic(recover(), adh.GetLogger(), &err) }() scope, sw := adh.startRequestProfile(ctx, metrics.AdminGetReplicationMessagesScope) defer sw.Stop() if request == nil { return nil, adh.error(validate.ErrRequestNotSet, scope) } if request.ClusterName == "" { return nil, adh.error(validate.ErrClusterNameNotSet, scope) } resp, err = adh.GetHistoryRawClient().GetReplicationMessages(ctx, request) if err != nil { return nil, adh.error(err, scope) } return resp, nil } // GetDomainReplicationMessages returns new domain replication tasks since last retrieved task ID. func (adh *adminHandlerImpl) GetDomainReplicationMessages( ctx context.Context, request *types.GetDomainReplicationMessagesRequest, ) (resp *types.GetDomainReplicationMessagesResponse, err error) { defer func() { log.CapturePanic(recover(), adh.GetLogger(), &err) }() scope, sw := adh.startRequestProfile(ctx, metrics.AdminGetDomainReplicationMessagesScope) defer sw.Stop() if request == nil { return nil, adh.error(validate.ErrRequestNotSet, scope) } if adh.GetDomainReplicationQueue() == nil { return nil, adh.error(errors.New("domain replication queue not enabled for cluster"), scope) } lastMessageID := defaultLastMessageID if request.LastRetrievedMessageID != nil { lastMessageID = request.GetLastRetrievedMessageID() } if lastMessageID == defaultLastMessageID { clusterAckLevels, err := adh.GetDomainReplicationQueue().GetAckLevels(ctx) if err == nil { if ackLevel, ok := clusterAckLevels[request.GetClusterName()]; ok { lastMessageID = ackLevel } } } replicationTasks, lastMessageID, err := adh.GetDomainReplicationQueue().GetReplicationMessages( ctx, lastMessageID, getDomainReplicationMessageBatchSize, ) if err != nil { return nil, adh.error(err, scope) } lastProcessedMessageID := defaultLastMessageID if request.LastProcessedMessageID != nil { lastProcessedMessageID = request.GetLastProcessedMessageID() } if err := adh.GetDomainReplicationQueue().UpdateAckLevel(ctx, lastProcessedMessageID, request.GetClusterName()); err != nil { adh.GetLogger().Warn("Failed to update domain replication queue ack level.", tag.TaskID(int64(lastProcessedMessageID)), tag.ClusterName(request.GetClusterName())) } return &types.GetDomainReplicationMessagesResponse{ Messages: &types.ReplicationMessages{ ReplicationTasks: replicationTasks, LastRetrievedMessageID: lastMessageID, }, }, nil } // GetDLQReplicationMessages returns new replication tasks based on the dlq info. func (adh *adminHandlerImpl) GetDLQReplicationMessages( ctx context.Context, request *types.GetDLQReplicationMessagesRequest, ) (resp *types.GetDLQReplicationMessagesResponse, err error) { defer func() { log.CapturePanic(recover(), adh.GetLogger(), &err) }() scope, sw := adh.startRequestProfile(ctx, metrics.AdminGetDLQReplicationMessagesScope) defer sw.Stop() if request == nil { return nil, adh.error(validate.ErrRequestNotSet, scope) } if len(request.GetTaskInfos()) == 0 { return nil, adh.error(validate.ErrEmptyReplicationInfo, scope) } resp, err = adh.GetHistoryClient().GetDLQReplicationMessages(ctx, request) if err != nil { return nil, adh.error(err, scope) } return resp, nil } // ReapplyEvents applies stale events to the current workflow and the current run func (adh *adminHandlerImpl) ReapplyEvents( ctx context.Context, request *types.ReapplyEventsRequest, ) (err error) { defer func() { log.CapturePanic(recover(), adh.GetLogger(), &err) }() scope, sw := adh.startRequestProfile(ctx, metrics.AdminReapplyEventsScope) defer sw.Stop() if request == nil { return adh.error(validate.ErrRequestNotSet, scope) } if request.GetDomainName() == "" { return adh.error(validate.ErrDomainNotSet, scope) } if request.WorkflowExecution == nil { return adh.error(validate.ErrExecutionNotSet, scope) } if request.GetWorkflowExecution().GetWorkflowID() == "" { return adh.error(validate.ErrWorkflowIDNotSet, scope) } if request.GetEvents() == nil { return adh.error(validate.ErrWorkflowIDNotSet, scope) } domainEntry, err := adh.GetDomainCache().GetDomain(request.GetDomainName()) if err != nil { return adh.error(err, scope) } err = adh.GetHistoryClient().ReapplyEvents(ctx, &types.HistoryReapplyEventsRequest{ DomainUUID: domainEntry.GetInfo().ID, Request: request, }) if err != nil { return adh.error(err, scope) } return nil } // ReadDLQMessages reads messages from DLQ func (adh *adminHandlerImpl) ReadDLQMessages( ctx context.Context, request *types.ReadDLQMessagesRequest, ) (resp *types.ReadDLQMessagesResponse, err error) { defer func() { log.CapturePanic(recover(), adh.GetLogger(), &err) }() scope, sw := adh.startRequestProfile(ctx, metrics.AdminReadDLQMessagesScope) defer sw.Stop() if request == nil { return nil, adh.error(validate.ErrRequestNotSet, scope) } if request.Type == nil { return nil, adh.error(validate.ErrEmptyQueueType, scope) } if request.GetMaximumPageSize() <= 0 { request.MaximumPageSize = common.ReadDLQMessagesPageSize } if request.InclusiveEndMessageID == nil { request.InclusiveEndMessageID = common.Int64Ptr(common.EndMessageID) } var tasks []*types.ReplicationTask var token []byte var op func() error switch request.GetType() { case types.DLQTypeReplication: return adh.GetHistoryClient().ReadDLQMessages(ctx, request) case types.DLQTypeDomain: op = func() error { select { case <-ctx.Done(): return ctx.Err() default: var err error tasks, token, err = adh.domainDLQHandler.Read( ctx, request.GetInclusiveEndMessageID(), int(request.GetMaximumPageSize()), request.GetNextPageToken()) return err } } default: return nil, &types.BadRequestError{Message: "The DLQ type is not supported."} } err = adh.throttleRetry.Do(ctx, op) if err != nil { return nil, adh.error(err, scope) } return &types.ReadDLQMessagesResponse{ ReplicationTasks: tasks, NextPageToken: token, }, nil } // PurgeDLQMessages purge messages from DLQ func (adh *adminHandlerImpl) PurgeDLQMessages( ctx context.Context, request *types.PurgeDLQMessagesRequest, ) (err error) { defer func() { log.CapturePanic(recover(), adh.GetLogger(), &err) }() scope, sw := adh.startRequestProfile(ctx, metrics.AdminPurgeDLQMessagesScope) defer sw.Stop() if request == nil { return adh.error(validate.ErrRequestNotSet, scope) } if request.Type == nil { return adh.error(validate.ErrEmptyQueueType, scope) } if request.InclusiveEndMessageID == nil { request.InclusiveEndMessageID = common.Int64Ptr(endMessageID) } var op func() error switch request.GetType() { case types.DLQTypeReplication: return adh.GetHistoryClient().PurgeDLQMessages(ctx, request) case types.DLQTypeDomain: op = func() error { select { case <-ctx.Done(): return ctx.Err() default: return adh.domainDLQHandler.Purge( ctx, request.GetInclusiveEndMessageID(), ) } } default: return &types.BadRequestError{Message: "The DLQ type is not supported."} } err = adh.throttleRetry.Do(ctx, op) if err != nil { return adh.error(err, scope) } return nil } func (adh *adminHandlerImpl) CountDLQMessages( ctx context.Context, request *types.CountDLQMessagesRequest, ) (resp *types.CountDLQMessagesResponse, err error) { defer func() { log.CapturePanic(recover(), adh.GetLogger(), &err) }() scope, sw := adh.startRequestProfile(ctx, metrics.AdminCountDLQMessagesScope) defer sw.Stop() domain, err := adh.domainDLQHandler.Count(ctx, request.ForceFetch) if err != nil { return nil, adh.error(err, scope) } history, err := adh.GetHistoryClient().CountDLQMessages(ctx, request) if err != nil { err = adh.error(err, scope) } return &types.CountDLQMessagesResponse{ History: history.Entries, Domain: domain, }, err } // MergeDLQMessages merges DLQ messages func (adh *adminHandlerImpl) MergeDLQMessages( ctx context.Context, request *types.MergeDLQMessagesRequest, ) (resp *types.MergeDLQMessagesResponse, err error) { defer func() { log.CapturePanic(recover(), adh.GetLogger(), &err) }() scope, sw := adh.startRequestProfile(ctx, metrics.AdminMergeDLQMessagesScope) defer sw.Stop() if request == nil { return nil, adh.error(validate.ErrRequestNotSet, scope) } if request.Type == nil { return nil, adh.error(validate.ErrEmptyQueueType, scope) } if request.InclusiveEndMessageID == nil { request.InclusiveEndMessageID = common.Int64Ptr(endMessageID) } var token []byte var op func() error switch request.GetType() { case types.DLQTypeReplication: return adh.GetHistoryClient().MergeDLQMessages(ctx, request) case types.DLQTypeDomain: op = func() error { select { case <-ctx.Done(): return ctx.Err() default: var err error token, err = adh.domainDLQHandler.Merge( ctx, request.GetInclusiveEndMessageID(), int(request.GetMaximumPageSize()), request.GetNextPageToken(), ) return err } } default: return nil, &types.BadRequestError{Message: "The DLQ type is not supported."} } err = adh.throttleRetry.Do(ctx, op) if err != nil { return nil, adh.error(err, scope) } return &types.MergeDLQMessagesResponse{ NextPageToken: token, }, nil } // RefreshWorkflowTasks re-generates the workflow tasks func (adh *adminHandlerImpl) RefreshWorkflowTasks( ctx context.Context, request *types.RefreshWorkflowTasksRequest, ) (err error) { defer func() { log.CapturePanic(recover(), adh.GetLogger(), &err) }() scope, sw := adh.startRequestProfile(ctx, metrics.AdminRefreshWorkflowTasksScope) defer sw.Stop() if request == nil { return adh.error(validate.ErrRequestNotSet, scope) } if err := validate.CheckExecution(request.Execution); err != nil { return adh.error(err, scope) } domainEntry, err := adh.GetDomainCache().GetDomain(request.GetDomain()) if err != nil { return adh.error(err, scope) } err = adh.GetHistoryClient().RefreshWorkflowTasks(ctx, &types.HistoryRefreshWorkflowTasksRequest{ DomainUIID: domainEntry.GetInfo().ID, Request: request, }) if err != nil { return adh.error(err, scope) } return nil } // ResendReplicationTasks requests replication task from remote cluster func (adh *adminHandlerImpl) ResendReplicationTasks( ctx context.Context, request *types.ResendReplicationTasksRequest, ) (err error) { defer func() { log.CapturePanic(recover(), adh.GetLogger(), &err) }() scope, sw := adh.startRequestProfile(ctx, metrics.AdminResendReplicationTasksScope) defer sw.Stop() if request == nil { return adh.error(validate.ErrRequestNotSet, scope) } resender := ndc.NewHistoryResender( adh.GetDomainCache(), adh.GetRemoteAdminClient(request.GetRemoteCluster()), func(ctx context.Context, request *types.ReplicateEventsV2Request) error { return adh.GetHistoryClient().ReplicateEventsV2(ctx, request) }, nil, nil, adh.GetLogger(), ) return resender.SendSingleWorkflowHistory( request.DomainID, request.GetWorkflowID(), request.GetRunID(), request.StartEventID, request.StartVersion, request.EndEventID, request.EndVersion, ) } func (adh *adminHandlerImpl) GetCrossClusterTasks( ctx context.Context, request *types.GetCrossClusterTasksRequest, ) (resp *types.GetCrossClusterTasksResponse, err error) { defer func() { log.CapturePanic(recover(), adh.GetLogger(), &err) }() scope, sw := adh.startRequestProfile(ctx, metrics.AdminGetCrossClusterTasksScope) defer sw.Stop() if request == nil { return nil, adh.error(validate.ErrRequestNotSet, scope) } if request.TargetCluster == "" { return nil, adh.error(validate.ErrClusterNameNotSet, scope) } resp, err = adh.GetHistoryRawClient().GetCrossClusterTasks(ctx, request) if err != nil { return nil, adh.error(err, scope) } return resp, nil } func (adh *adminHandlerImpl) RespondCrossClusterTasksCompleted( ctx context.Context, request *types.RespondCrossClusterTasksCompletedRequest, ) (resp *types.RespondCrossClusterTasksCompletedResponse, err error) { defer func() { log.CapturePanic(recover(), adh.GetLogger(), &err) }() scope, sw := adh.startRequestProfile(ctx, metrics.AdminRespondCrossClusterTasksCompletedScope) defer sw.Stop() if request == nil { return nil, adh.error(validate.ErrRequestNotSet, scope) } if request.TargetCluster == "" { return nil, adh.error(validate.ErrClusterNameNotSet, scope) } resp, err = adh.GetHistoryClient().RespondCrossClusterTasksCompleted(ctx, request) if err != nil { return nil, adh.error(err, scope) } return resp, nil } func (adh *adminHandlerImpl) validateGetWorkflowExecutionRawHistoryV2Request( request *types.GetWorkflowExecutionRawHistoryV2Request, ) error { execution := request.Execution if len(execution.GetWorkflowID()) == 0 { return &types.BadRequestError{Message: "Invalid WorkflowID."} } // TODO currently, this API is only going to be used by re-send history events // to remote cluster if kafka is lossy again, in the future, this API can be used // by CLI and client, then empty runID (meaning the current workflow) should be allowed if _, err := uuid.Parse(execution.GetRunID()); err != nil { return &types.BadRequestError{Message: "Invalid RunID."} } pageSize := int(request.GetMaximumPageSize()) if pageSize <= 0 { return &types.BadRequestError{Message: "Invalid PageSize."} } if (request.StartEventID != nil && request.StartEventVersion == nil) || (request.StartEventID == nil && request.StartEventVersion != nil) { return &types.BadRequestError{Message: "Invalid start event id and start event version combination."} } if (request.EndEventID != nil && request.EndEventVersion == nil) || (request.EndEventID == nil && request.EndEventVersion != nil) { return &types.BadRequestError{Message: "Invalid end event id and end event version combination."} } return nil } func (adh *adminHandlerImpl) validateConfigForAdvanceVisibility() error { if adh.params.ESConfig == nil || adh.params.ESClient == nil { return errors.New("ES related config not found") } return nil } func (adh *adminHandlerImpl) setRequestDefaultValueAndGetTargetVersionHistory( request *types.GetWorkflowExecutionRawHistoryV2Request, versionHistories *persistence.VersionHistories, ) (*persistence.VersionHistory, error) { targetBranch, err := versionHistories.GetCurrentVersionHistory() if err != nil { return nil, err } firstItem, err := targetBranch.GetFirstItem() if err != nil { return nil, err } lastItem, err := targetBranch.GetLastItem() if err != nil { return nil, err } if request.StartEventID == nil || request.StartEventVersion == nil { // If start event is not set, get the events from the first event // As the API is exclusive-exclusive, use first event id - 1 here request.StartEventID = common.Int64Ptr(common.FirstEventID - 1) request.StartEventVersion = common.Int64Ptr(firstItem.Version) } if request.EndEventID == nil || request.EndEventVersion == nil { // If end event is not set, get the events until the end event // As the API is exclusive-exclusive, use end event id + 1 here request.EndEventID = common.Int64Ptr(lastItem.EventID + 1) request.EndEventVersion = common.Int64Ptr(lastItem.Version) } if request.GetStartEventID() < 0 { return nil, &types.BadRequestError{Message: "Invalid FirstEventID && NextEventID combination."} } // get branch based on the end event if end event is defined in the request if request.GetEndEventID() == lastItem.EventID+1 && request.GetEndEventVersion() == lastItem.Version { // this is a special case, target branch remains the same } else { endItem := persistence.NewVersionHistoryItem(request.GetEndEventID(), request.GetEndEventVersion()) _, targetBranch, err = versionHistories.FindFirstVersionHistoryByItem(endItem) if err != nil { return nil, err } } startItem := persistence.NewVersionHistoryItem(request.GetStartEventID(), request.GetStartEventVersion()) // If the request start event is defined. The start event may be on a different branch as current branch. // We need to find the LCA of the start event and the current branch. if request.GetStartEventID() == common.FirstEventID-1 && request.GetStartEventVersion() == firstItem.Version { // this is a special case, start event is on the same branch as target branch } else { if !targetBranch.ContainsItem(startItem) { _, startBranch, err := versionHistories.FindFirstVersionHistoryByItem(startItem) if err != nil { return nil, err } startItem, err = targetBranch.FindLCAItem(startBranch) if err != nil { return nil, err } request.StartEventID = common.Int64Ptr(startItem.EventID) request.StartEventVersion = common.Int64Ptr(startItem.Version) } } return targetBranch, nil } func (adh *adminHandlerImpl) generatePaginationToken( request *types.GetWorkflowExecutionRawHistoryV2Request, versionHistories *persistence.VersionHistories, ) *getWorkflowRawHistoryV2Token { execution := request.Execution return &getWorkflowRawHistoryV2Token{ DomainName: request.GetDomain(), WorkflowID: execution.GetWorkflowID(), RunID: execution.GetRunID(), StartEventID: request.GetStartEventID(), StartEventVersion: request.GetStartEventVersion(), EndEventID: request.GetEndEventID(), EndEventVersion: request.GetEndEventVersion(), VersionHistories: versionHistories.ToInternalType(), PersistenceToken: nil, // this is the initialized value } } func (adh *adminHandlerImpl) validatePaginationToken( request *types.GetWorkflowExecutionRawHistoryV2Request, token *getWorkflowRawHistoryV2Token, ) error { execution := request.Execution if request.GetDomain() != token.DomainName || execution.GetWorkflowID() != token.WorkflowID || execution.GetRunID() != token.RunID || request.GetStartEventID() != token.StartEventID || request.GetStartEventVersion() != token.StartEventVersion || request.GetEndEventID() != token.EndEventID || request.GetEndEventVersion() != token.EndEventVersion { return &types.BadRequestError{Message: "Invalid pagination token."} } return nil } // startRequestProfile initiates recording of request metrics func (adh *adminHandlerImpl) startRequestProfile(ctx context.Context, scope int) (metrics.Scope, metrics.Stopwatch) { metricsScope := adh.GetMetricsClient().Scope(scope).Tagged(metrics.DomainUnknownTag()).Tagged(metrics.GetContextTags(ctx)...) sw := metricsScope.StartTimer(metrics.CadenceLatency) metricsScope.IncCounter(metrics.CadenceRequests) return metricsScope, sw } func (adh *adminHandlerImpl) error(err error, scope metrics.Scope) error { switch err.(type) { case *types.InternalServiceError: adh.GetLogger().Error("Internal service error", tag.Error(err)) scope.IncCounter(metrics.CadenceFailures) return err case *types.BadRequestError: scope.IncCounter(metrics.CadenceErrBadRequestCounter) return err case *types.ServiceBusyError: scope.IncCounter(metrics.CadenceErrServiceBusyCounter) return err case *types.EntityNotExistsError: return err default: adh.GetLogger().Error("Uncategorized error", tag.Error(err)) scope.IncCounter(metrics.CadenceFailures) return &types.InternalServiceError{Message: err.Error()} } } func convertIndexedValueTypeToESDataType(valueType types.IndexedValueType) string { switch valueType { case types.IndexedValueTypeString: return "text" case types.IndexedValueTypeKeyword: return "keyword" case types.IndexedValueTypeInt: return "long" case types.IndexedValueTypeDouble: return "double" case types.IndexedValueTypeBool: return "boolean" case types.IndexedValueTypeDatetime: return "date" default: return "" } } func serializeRawHistoryToken(token *getWorkflowRawHistoryV2Token) ([]byte, error) { if token == nil { return nil, nil } bytes, err := json.Marshal(token) return bytes, err } func deserializeRawHistoryToken(bytes []byte) (*getWorkflowRawHistoryV2Token, error) { token := &getWorkflowRawHistoryV2Token{} err := json.Unmarshal(bytes, token) return token, err } func (adh *adminHandlerImpl) GetDynamicConfig(ctx context.Context, request *types.GetDynamicConfigRequest) (_ *types.GetDynamicConfigResponse, retError error) { defer func() { log.CapturePanic(recover(), adh.GetLogger(), &retError) }() scope, sw := adh.startRequestProfile(ctx, metrics.AdminGetDynamicConfigScope) defer sw.Stop() if request == nil || request.ConfigName == "" { return nil, adh.error(validate.ErrRequestNotSet, scope) } keyVal, err := dc.GetKeyFromKeyName(request.ConfigName) if err != nil { return nil, adh.error(err, scope) } var value interface{} if request.Filters == nil { value, err = adh.params.DynamicConfig.GetValue(keyVal) if err != nil { return nil, adh.error(err, scope) } } else { convFilters, err := convertFilterListToMap(request.Filters) if err != nil { return nil, adh.error(err, scope) } value, err = adh.params.DynamicConfig.GetValueWithFilters(keyVal, convFilters) if err != nil { return nil, adh.error(err, scope) } } data, err := json.Marshal(value) if err != nil { return nil, adh.error(err, scope) } return &types.GetDynamicConfigResponse{ Value: &types.DataBlob{ EncodingType: types.EncodingTypeJSON.Ptr(), Data: data, }, }, nil } func (adh *adminHandlerImpl) UpdateDynamicConfig(ctx context.Context, request *types.UpdateDynamicConfigRequest) (retError error) { defer func() { log.CapturePanic(recover(), adh.GetLogger(), &retError) }() scope, sw := adh.startRequestProfile(ctx, metrics.AdminUpdateDynamicConfigScope) defer sw.Stop() if request == nil || request.ConfigName == "" { return adh.error(validate.ErrRequestNotSet, scope) } keyVal, err := dc.GetKeyFromKeyName(request.ConfigName) if err != nil { return adh.error(err, scope) } return adh.params.DynamicConfig.UpdateValue(keyVal, request.ConfigValues) } func (adh *adminHandlerImpl) RestoreDynamicConfig(ctx context.Context, request *types.RestoreDynamicConfigRequest) (retError error) { defer func() { log.CapturePanic(recover(), adh.GetLogger(), &retError) }() scope, sw := adh.startRequestProfile(ctx, metrics.AdminRestoreDynamicConfigScope) defer sw.Stop() if request == nil || request.ConfigName == "" { return adh.error(validate.ErrRequestNotSet, scope) } keyVal, err := dc.GetKeyFromKeyName(request.ConfigName) if err != nil { return adh.error(err, scope) } var filters map[dc.Filter]interface{} if request.Filters == nil { filters = nil } else { filters, err = convertFilterListToMap(request.Filters) if err != nil { return adh.error(validate.ErrInvalidFilters, scope) } } return adh.params.DynamicConfig.RestoreValue(keyVal, filters) } func (adh *adminHandlerImpl) ListDynamicConfig(ctx context.Context, request *types.ListDynamicConfigRequest) (_ *types.ListDynamicConfigResponse, retError error) { defer func() { log.CapturePanic(recover(), adh.GetLogger(), &retError) }() scope, sw := adh.startRequestProfile(ctx, metrics.AdminListDynamicConfigScope) defer sw.Stop() if request == nil { return nil, adh.error(validate.ErrRequestNotSet, scope) } keyVal, err := dc.GetKeyFromKeyName(request.ConfigName) if err != nil || request.ConfigName == "" { entries, err2 := adh.params.DynamicConfig.ListValue(nil) if err2 != nil { return nil, adh.error(err2, scope) } return &types.ListDynamicConfigResponse{ Entries: entries, }, nil } entries, err2 := adh.params.DynamicConfig.ListValue(keyVal) if err2 != nil { err = adh.error(err2, scope) return nil, adh.error(err, scope) } return &types.ListDynamicConfigResponse{ Entries: entries, }, nil } func (adh *adminHandlerImpl) GetGlobalIsolationGroups(ctx context.Context, request *types.GetGlobalIsolationGroupsRequest) (_ *types.GetGlobalIsolationGroupsResponse, retError error) { defer func() { log.CapturePanic(recover(), adh.GetLogger(), &retError) }() scope, sw := adh.startRequestProfile(ctx, metrics.GetGlobalIsolationGroups) defer sw.Stop() if request == nil { return nil, adh.error(validate.ErrRequestNotSet, scope) } resp, err := adh.isolationGroups.GetGlobalState(ctx) if err != nil { return nil, adh.error(err, scope) } return resp, nil } func (adh *adminHandlerImpl) UpdateGlobalIsolationGroups(ctx context.Context, request *types.UpdateGlobalIsolationGroupsRequest) (_ *types.UpdateGlobalIsolationGroupsResponse, retError error) { defer func() { log.CapturePanic(recover(), adh.GetLogger(), &retError) }() scope, sw := adh.startRequestProfile(ctx, metrics.UpdateGlobalIsolationGroups) defer sw.Stop() if request == nil { return nil, adh.error(validate.ErrRequestNotSet, scope) } err := adh.isolationGroups.UpdateGlobalState(ctx, *request) if err != nil { return nil, adh.error(err, scope) } return &types.UpdateGlobalIsolationGroupsResponse{}, nil } func (adh *adminHandlerImpl) GetDomainIsolationGroups(ctx context.Context, request *types.GetDomainIsolationGroupsRequest) (_ *types.GetDomainIsolationGroupsResponse, retError error) { defer func() { log.CapturePanic(recover(), adh.GetLogger(), &retError) }() scope, sw := adh.startRequestProfile(ctx, metrics.GetDomainIsolationGroups) defer sw.Stop() if request == nil { return nil, adh.error(validate.ErrRequestNotSet, scope) } resp, err := adh.isolationGroups.GetDomainState(ctx, types.GetDomainIsolationGroupsRequest{Domain: request.Domain}) if err != nil { return nil, adh.error(err, scope) } return resp, nil } func (adh *adminHandlerImpl) UpdateDomainIsolationGroups(ctx context.Context, request *types.UpdateDomainIsolationGroupsRequest) (_ *types.UpdateDomainIsolationGroupsResponse, retError error) { defer func() { log.CapturePanic(recover(), adh.GetLogger(), &retError) }() scope, sw := adh.startRequestProfile(ctx, metrics.UpdateDomainIsolationGroups) defer sw.Stop() if request == nil { return nil, adh.error(validate.ErrRequestNotSet, scope) } err := adh.isolationGroups.UpdateDomainState(ctx, *request) if err != nil { return nil, adh.error(err, scope) } return &types.UpdateDomainIsolationGroupsResponse{}, nil } func (adh *adminHandlerImpl) GetDomainAsyncWorkflowConfiguraton(ctx context.Context, request *types.GetDomainAsyncWorkflowConfiguratonRequest) (_ *types.GetDomainAsyncWorkflowConfiguratonResponse, retError error) { defer func() { log.CapturePanic(recover(), adh.GetLogger(), &retError) }() scope, sw := adh.startRequestProfile(ctx, metrics.UpdateDomainAsyncWorkflowConfiguraton) defer sw.Stop() if request == nil { return nil, adh.error(validate.ErrRequestNotSet, scope) } resp, err := adh.asyncWFQueueConfigs.GetConfiguraton(ctx, request) if err != nil { return nil, adh.error(err, scope) } return resp, nil } func (adh *adminHandlerImpl) UpdateDomainAsyncWorkflowConfiguraton(ctx context.Context, request *types.UpdateDomainAsyncWorkflowConfiguratonRequest) (_ *types.UpdateDomainAsyncWorkflowConfiguratonResponse, retError error) { defer func() { log.CapturePanic(recover(), adh.GetLogger(), &retError) }() scope, sw := adh.startRequestProfile(ctx, metrics.UpdateDomainAsyncWorkflowConfiguraton) defer sw.Stop() if request == nil { return nil, adh.error(validate.ErrRequestNotSet, scope) } resp, err := adh.asyncWFQueueConfigs.UpdateConfiguration(ctx, request) if err != nil { return nil, adh.error(err, scope) } return resp, nil } func convertFromDataBlob(blob *types.DataBlob) (interface{}, error) { switch *blob.EncodingType { case types.EncodingTypeJSON: var v interface{} err := json.Unmarshal(blob.Data, &v) return v, err default: return nil, errors.New("unsupported blob encoding") } } func convertFilterListToMap(filters []*types.DynamicConfigFilter) (map[dc.Filter]interface{}, error) { newFilters := make(map[dc.Filter]interface{}) for _, filter := range filters { val, err := convertFromDataBlob(filter.Value) if err != nil { return nil, err } newFilters[dc.ParseFilter(filter.Name)] = val } return newFilters, nil }