client/history/client.go (975 lines of code) (raw):

// Copyright (c) 2017 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package history import ( "context" "math/rand" "sync" "time" "go.uber.org/yarpc" "golang.org/x/sync/errgroup" "github.com/uber/cadence/common" "github.com/uber/cadence/common/future" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/rpc" "github.com/uber/cadence/common/types" ) var _ Client = (*clientImpl)(nil) type ( clientImpl struct { numberOfShards int rpcMaxSizeInBytes int // This value currently only used in GetReplicationMessage API tokenSerializer common.TaskTokenSerializer client Client peerResolver PeerResolver logger log.Logger } getReplicationMessagesWithSize struct { response *types.GetReplicationMessagesResponse size int peer string } ) // NewClient creates a new history service TChannel client func NewClient( numberOfShards int, rpcMaxSizeInBytes int, client Client, peerResolver PeerResolver, logger log.Logger, ) Client { return &clientImpl{ numberOfShards: numberOfShards, rpcMaxSizeInBytes: rpcMaxSizeInBytes, tokenSerializer: common.NewJSONTaskTokenSerializer(), client: client, peerResolver: peerResolver, logger: logger, } } func (c *clientImpl) StartWorkflowExecution( ctx context.Context, request *types.HistoryStartWorkflowExecutionRequest, opts ...yarpc.CallOption, ) (*types.StartWorkflowExecutionResponse, error) { peer, err := c.peerResolver.FromWorkflowID(request.StartRequest.WorkflowID) if err != nil { return nil, err } var response *types.StartWorkflowExecutionResponse op := func(ctx context.Context, peer string) error { var err error response, err = c.client.StartWorkflowExecution(ctx, request, append(opts, yarpc.WithShardKey(peer))...) return err } err = c.executeWithRedirect(ctx, peer, op) if err != nil { return nil, err } return response, nil } func (c *clientImpl) GetMutableState( ctx context.Context, request *types.GetMutableStateRequest, opts ...yarpc.CallOption, ) (*types.GetMutableStateResponse, error) { peer, err := c.peerResolver.FromWorkflowID(request.Execution.WorkflowID) if err != nil { return nil, err } var response *types.GetMutableStateResponse op := func(ctx context.Context, peer string) error { var err error response, err = c.client.GetMutableState(ctx, request, append(opts, yarpc.WithShardKey(peer))...) return err } err = c.executeWithRedirect(ctx, peer, op) if err != nil { return nil, err } return response, nil } func (c *clientImpl) PollMutableState( ctx context.Context, request *types.PollMutableStateRequest, opts ...yarpc.CallOption, ) (*types.PollMutableStateResponse, error) { peer, err := c.peerResolver.FromWorkflowID(request.Execution.WorkflowID) if err != nil { return nil, err } var response *types.PollMutableStateResponse op := func(ctx context.Context, peer string) error { var err error response, err = c.client.PollMutableState(ctx, request, append(opts, yarpc.WithShardKey(peer))...) return err } err = c.executeWithRedirect(ctx, peer, op) if err != nil { return nil, err } return response, nil } func (c *clientImpl) DescribeHistoryHost( ctx context.Context, request *types.DescribeHistoryHostRequest, opts ...yarpc.CallOption, ) (*types.DescribeHistoryHostResponse, error) { var err error var peer string if request.ShardIDForHost != nil { peer, err = c.peerResolver.FromShardID(int(request.GetShardIDForHost())) } else if request.ExecutionForHost != nil { peer, err = c.peerResolver.FromWorkflowID(request.ExecutionForHost.GetWorkflowID()) } else { peer, err = c.peerResolver.FromHostAddress(request.GetHostAddress()) } if err != nil { return nil, err } var response *types.DescribeHistoryHostResponse op := func(ctx context.Context, peer string) error { var err error response, err = c.client.DescribeHistoryHost(ctx, request, append(opts, yarpc.WithShardKey(peer))...) return err } err = c.executeWithRedirect(ctx, peer, op) if err != nil { return nil, err } return response, nil } func (c *clientImpl) RemoveTask( ctx context.Context, request *types.RemoveTaskRequest, opts ...yarpc.CallOption, ) error { peer, err := c.peerResolver.FromShardID(int(request.GetShardID())) if err != nil { return err } op := func(ctx context.Context, peer string) error { var err error err = c.client.RemoveTask(ctx, request, append(opts, yarpc.WithShardKey(peer))...) return err } err = c.executeWithRedirect(ctx, peer, op) return err } func (c *clientImpl) CloseShard( ctx context.Context, request *types.CloseShardRequest, opts ...yarpc.CallOption, ) error { peer, err := c.peerResolver.FromShardID(int(request.GetShardID())) if err != nil { return err } op := func(ctx context.Context, peer string) error { var err error err = c.client.CloseShard(ctx, request, append(opts, yarpc.WithShardKey(peer))...) return err } err = c.executeWithRedirect(ctx, peer, op) if err != nil { return err } return nil } func (c *clientImpl) ResetQueue( ctx context.Context, request *types.ResetQueueRequest, opts ...yarpc.CallOption, ) error { peer, err := c.peerResolver.FromShardID(int(request.GetShardID())) if err != nil { return err } op := func(ctx context.Context, peer string) error { var err error err = c.client.ResetQueue(ctx, request, append(opts, yarpc.WithShardKey(peer))...) return err } err = c.executeWithRedirect(ctx, peer, op) if err != nil { return err } return nil } func (c *clientImpl) DescribeQueue( ctx context.Context, request *types.DescribeQueueRequest, opts ...yarpc.CallOption, ) (*types.DescribeQueueResponse, error) { peer, err := c.peerResolver.FromShardID(int(request.GetShardID())) if err != nil { return nil, err } var response *types.DescribeQueueResponse op := func(ctx context.Context, peer string) error { var err error response, err = c.client.DescribeQueue(ctx, request, append(opts, yarpc.WithShardKey(peer))...) return err } err = c.executeWithRedirect(ctx, peer, op) if err != nil { return nil, err } return response, nil } func (c *clientImpl) DescribeMutableState( ctx context.Context, request *types.DescribeMutableStateRequest, opts ...yarpc.CallOption, ) (*types.DescribeMutableStateResponse, error) { peer, err := c.peerResolver.FromWorkflowID(request.Execution.WorkflowID) if err != nil { return nil, err } var response *types.DescribeMutableStateResponse op := func(ctx context.Context, peer string) error { var err error response, err = c.client.DescribeMutableState(ctx, request, append(opts, yarpc.WithShardKey(peer))...) return err } err = c.executeWithRedirect(ctx, peer, op) if err != nil { return nil, err } return response, nil } func (c *clientImpl) ResetStickyTaskList( ctx context.Context, request *types.HistoryResetStickyTaskListRequest, opts ...yarpc.CallOption, ) (*types.HistoryResetStickyTaskListResponse, error) { peer, err := c.peerResolver.FromWorkflowID(request.Execution.WorkflowID) if err != nil { return nil, err } var response *types.HistoryResetStickyTaskListResponse op := func(ctx context.Context, peer string) error { var err error response, err = c.client.ResetStickyTaskList(ctx, request, append(opts, yarpc.WithShardKey(peer))...) return err } err = c.executeWithRedirect(ctx, peer, op) if err != nil { return nil, err } return response, nil } func (c *clientImpl) DescribeWorkflowExecution( ctx context.Context, request *types.HistoryDescribeWorkflowExecutionRequest, opts ...yarpc.CallOption, ) (*types.DescribeWorkflowExecutionResponse, error) { peer, err := c.peerResolver.FromWorkflowID(request.Request.Execution.WorkflowID) if err != nil { return nil, err } var response *types.DescribeWorkflowExecutionResponse op := func(ctx context.Context, peer string) error { var err error response, err = c.client.DescribeWorkflowExecution(ctx, request, append(opts, yarpc.WithShardKey(peer))...) return err } err = c.executeWithRedirect(ctx, peer, op) if err != nil { return nil, err } return response, nil } func (c *clientImpl) RecordDecisionTaskStarted( ctx context.Context, request *types.RecordDecisionTaskStartedRequest, opts ...yarpc.CallOption, ) (*types.RecordDecisionTaskStartedResponse, error) { peer, err := c.peerResolver.FromWorkflowID(request.WorkflowExecution.WorkflowID) if err != nil { return nil, err } var response *types.RecordDecisionTaskStartedResponse op := func(ctx context.Context, peer string) error { var err error response, err = c.client.RecordDecisionTaskStarted(ctx, request, append(opts, yarpc.WithShardKey(peer))...) return err } err = c.executeWithRedirect(ctx, peer, op) if err != nil { return nil, err } return response, nil } func (c *clientImpl) RecordActivityTaskStarted( ctx context.Context, request *types.RecordActivityTaskStartedRequest, opts ...yarpc.CallOption, ) (*types.RecordActivityTaskStartedResponse, error) { peer, err := c.peerResolver.FromWorkflowID(request.WorkflowExecution.WorkflowID) if err != nil { return nil, err } var response *types.RecordActivityTaskStartedResponse op := func(ctx context.Context, peer string) error { var err error response, err = c.client.RecordActivityTaskStarted(ctx, request, append(opts, yarpc.WithShardKey(peer))...) return err } err = c.executeWithRedirect(ctx, peer, op) if err != nil { return nil, err } return response, nil } func (c *clientImpl) RespondDecisionTaskCompleted( ctx context.Context, request *types.HistoryRespondDecisionTaskCompletedRequest, opts ...yarpc.CallOption, ) (*types.HistoryRespondDecisionTaskCompletedResponse, error) { taskToken, err := c.tokenSerializer.Deserialize(request.CompleteRequest.TaskToken) if err != nil { return nil, err } peer, err := c.peerResolver.FromWorkflowID(taskToken.WorkflowID) if err != nil { return nil, err } var response *types.HistoryRespondDecisionTaskCompletedResponse op := func(ctx context.Context, peer string) error { response, err = c.client.RespondDecisionTaskCompleted(ctx, request, append(opts, yarpc.WithShardKey(peer))...) return err } err = c.executeWithRedirect(ctx, peer, op) return response, err } func (c *clientImpl) RespondDecisionTaskFailed( ctx context.Context, request *types.HistoryRespondDecisionTaskFailedRequest, opts ...yarpc.CallOption, ) error { taskToken, err := c.tokenSerializer.Deserialize(request.FailedRequest.TaskToken) if err != nil { return err } peer, err := c.peerResolver.FromWorkflowID(taskToken.WorkflowID) if err != nil { return err } op := func(ctx context.Context, peer string) error { return c.client.RespondDecisionTaskFailed(ctx, request, append(opts, yarpc.WithShardKey(peer))...) } err = c.executeWithRedirect(ctx, peer, op) return err } func (c *clientImpl) RespondActivityTaskCompleted( ctx context.Context, request *types.HistoryRespondActivityTaskCompletedRequest, opts ...yarpc.CallOption, ) error { taskToken, err := c.tokenSerializer.Deserialize(request.CompleteRequest.TaskToken) if err != nil { return err } peer, err := c.peerResolver.FromWorkflowID(taskToken.WorkflowID) if err != nil { return err } op := func(ctx context.Context, peer string) error { return c.client.RespondActivityTaskCompleted(ctx, request, append(opts, yarpc.WithShardKey(peer))...) } err = c.executeWithRedirect(ctx, peer, op) return err } func (c *clientImpl) RespondActivityTaskFailed( ctx context.Context, request *types.HistoryRespondActivityTaskFailedRequest, opts ...yarpc.CallOption, ) error { taskToken, err := c.tokenSerializer.Deserialize(request.FailedRequest.TaskToken) if err != nil { return err } peer, err := c.peerResolver.FromWorkflowID(taskToken.WorkflowID) if err != nil { return err } op := func(ctx context.Context, peer string) error { return c.client.RespondActivityTaskFailed(ctx, request, append(opts, yarpc.WithShardKey(peer))...) } err = c.executeWithRedirect(ctx, peer, op) return err } func (c *clientImpl) RespondActivityTaskCanceled( ctx context.Context, request *types.HistoryRespondActivityTaskCanceledRequest, opts ...yarpc.CallOption, ) error { taskToken, err := c.tokenSerializer.Deserialize(request.CancelRequest.TaskToken) if err != nil { return err } peer, err := c.peerResolver.FromWorkflowID(taskToken.WorkflowID) if err != nil { return err } op := func(ctx context.Context, peer string) error { return c.client.RespondActivityTaskCanceled(ctx, request, append(opts, yarpc.WithShardKey(peer))...) } err = c.executeWithRedirect(ctx, peer, op) return err } func (c *clientImpl) RecordActivityTaskHeartbeat( ctx context.Context, request *types.HistoryRecordActivityTaskHeartbeatRequest, opts ...yarpc.CallOption, ) (*types.RecordActivityTaskHeartbeatResponse, error) { taskToken, err := c.tokenSerializer.Deserialize(request.HeartbeatRequest.TaskToken) if err != nil { return nil, err } peer, err := c.peerResolver.FromWorkflowID(taskToken.WorkflowID) if err != nil { return nil, err } var response *types.RecordActivityTaskHeartbeatResponse op := func(ctx context.Context, peer string) error { var err error response, err = c.client.RecordActivityTaskHeartbeat(ctx, request, append(opts, yarpc.WithShardKey(peer))...) return err } err = c.executeWithRedirect(ctx, peer, op) if err != nil { return nil, err } return response, nil } func (c *clientImpl) RequestCancelWorkflowExecution( ctx context.Context, request *types.HistoryRequestCancelWorkflowExecutionRequest, opts ...yarpc.CallOption, ) error { peer, err := c.peerResolver.FromWorkflowID(request.CancelRequest.WorkflowExecution.WorkflowID) if err != nil { return err } op := func(ctx context.Context, peer string) error { return c.client.RequestCancelWorkflowExecution(ctx, request, append(opts, yarpc.WithShardKey(peer))...) } return c.executeWithRedirect(ctx, peer, op) } func (c *clientImpl) SignalWorkflowExecution( ctx context.Context, request *types.HistorySignalWorkflowExecutionRequest, opts ...yarpc.CallOption, ) error { peer, err := c.peerResolver.FromWorkflowID(request.SignalRequest.WorkflowExecution.WorkflowID) if err != nil { return err } op := func(ctx context.Context, peer string) error { return c.client.SignalWorkflowExecution(ctx, request, append(opts, yarpc.WithShardKey(peer))...) } err = c.executeWithRedirect(ctx, peer, op) return err } func (c *clientImpl) SignalWithStartWorkflowExecution( ctx context.Context, request *types.HistorySignalWithStartWorkflowExecutionRequest, opts ...yarpc.CallOption, ) (*types.StartWorkflowExecutionResponse, error) { peer, err := c.peerResolver.FromWorkflowID(request.SignalWithStartRequest.WorkflowID) if err != nil { return nil, err } var response *types.StartWorkflowExecutionResponse op := func(ctx context.Context, peer string) error { var err error response, err = c.client.SignalWithStartWorkflowExecution(ctx, request, append(opts, yarpc.WithShardKey(peer))...) return err } err = c.executeWithRedirect(ctx, peer, op) if err != nil { return nil, err } return response, err } func (c *clientImpl) RemoveSignalMutableState( ctx context.Context, request *types.RemoveSignalMutableStateRequest, opts ...yarpc.CallOption, ) error { peer, err := c.peerResolver.FromWorkflowID(request.WorkflowExecution.WorkflowID) if err != nil { return err } op := func(ctx context.Context, peer string) error { return c.client.RemoveSignalMutableState(ctx, request, yarpc.WithShardKey(peer)) } err = c.executeWithRedirect(ctx, peer, op) return err } func (c *clientImpl) TerminateWorkflowExecution( ctx context.Context, request *types.HistoryTerminateWorkflowExecutionRequest, opts ...yarpc.CallOption, ) error { peer, err := c.peerResolver.FromWorkflowID(request.TerminateRequest.WorkflowExecution.WorkflowID) if err != nil { return err } op := func(ctx context.Context, peer string) error { return c.client.TerminateWorkflowExecution(ctx, request, append(opts, yarpc.WithShardKey(peer))...) } err = c.executeWithRedirect(ctx, peer, op) return err } func (c *clientImpl) ResetWorkflowExecution( ctx context.Context, request *types.HistoryResetWorkflowExecutionRequest, opts ...yarpc.CallOption, ) (*types.ResetWorkflowExecutionResponse, error) { peer, err := c.peerResolver.FromWorkflowID(request.ResetRequest.WorkflowExecution.WorkflowID) if err != nil { return nil, err } var response *types.ResetWorkflowExecutionResponse op := func(ctx context.Context, peer string) error { response, err = c.client.ResetWorkflowExecution(ctx, request, append(opts, yarpc.WithShardKey(peer))...) return err } err = c.executeWithRedirect(ctx, peer, op) if err != nil { return nil, err } return response, err } func (c *clientImpl) ScheduleDecisionTask( ctx context.Context, request *types.ScheduleDecisionTaskRequest, opts ...yarpc.CallOption, ) error { peer, err := c.peerResolver.FromWorkflowID(request.WorkflowExecution.WorkflowID) if err != nil { return err } op := func(ctx context.Context, peer string) error { return c.client.ScheduleDecisionTask(ctx, request, append(opts, yarpc.WithShardKey(peer))...) } err = c.executeWithRedirect(ctx, peer, op) return err } func (c *clientImpl) RecordChildExecutionCompleted( ctx context.Context, request *types.RecordChildExecutionCompletedRequest, opts ...yarpc.CallOption, ) error { peer, err := c.peerResolver.FromWorkflowID(request.WorkflowExecution.WorkflowID) if err != nil { return err } op := func(ctx context.Context, peer string) error { return c.client.RecordChildExecutionCompleted(ctx, request, append(opts, yarpc.WithShardKey(peer))...) } err = c.executeWithRedirect(ctx, peer, op) return err } func (c *clientImpl) ReplicateEventsV2( ctx context.Context, request *types.ReplicateEventsV2Request, opts ...yarpc.CallOption, ) error { peer, err := c.peerResolver.FromWorkflowID(request.WorkflowExecution.GetWorkflowID()) if err != nil { return err } op := func(ctx context.Context, peer string) error { return c.client.ReplicateEventsV2(ctx, request, append(opts, yarpc.WithShardKey(peer))...) } err = c.executeWithRedirect(ctx, peer, op) return err } func (c *clientImpl) SyncShardStatus( ctx context.Context, request *types.SyncShardStatusRequest, opts ...yarpc.CallOption, ) error { // we do not have a workflow ID here, instead, we have something even better peer, err := c.peerResolver.FromShardID(int(request.GetShardID())) if err != nil { return err } op := func(ctx context.Context, peer string) error { return c.client.SyncShardStatus(ctx, request, append(opts, yarpc.WithShardKey(peer))...) } err = c.executeWithRedirect(ctx, peer, op) return err } func (c *clientImpl) SyncActivity( ctx context.Context, request *types.SyncActivityRequest, opts ...yarpc.CallOption, ) error { peer, err := c.peerResolver.FromWorkflowID(request.GetWorkflowID()) if err != nil { return err } op := func(ctx context.Context, peer string) error { return c.client.SyncActivity(ctx, request, append(opts, yarpc.WithShardKey(peer))...) } err = c.executeWithRedirect(ctx, peer, op) return err } func (c *clientImpl) QueryWorkflow( ctx context.Context, request *types.HistoryQueryWorkflowRequest, opts ...yarpc.CallOption, ) (*types.HistoryQueryWorkflowResponse, error) { peer, err := c.peerResolver.FromWorkflowID(request.GetRequest().GetExecution().GetWorkflowID()) if err != nil { return nil, err } var response *types.HistoryQueryWorkflowResponse op := func(ctx context.Context, peer string) error { var err error response, err = c.client.QueryWorkflow(ctx, request, append(opts, yarpc.WithShardKey(peer))...) return err } err = c.executeWithRedirect(ctx, peer, op) if err != nil { return nil, err } return response, nil } func (c *clientImpl) GetReplicationMessages( ctx context.Context, request *types.GetReplicationMessagesRequest, opts ...yarpc.CallOption, ) (*types.GetReplicationMessagesResponse, error) { requestsByPeer := make(map[string]*types.GetReplicationMessagesRequest) for _, token := range request.Tokens { peer, err := c.peerResolver.FromShardID(int(token.GetShardID())) if err != nil { return nil, err } if _, ok := requestsByPeer[peer]; !ok { requestsByPeer[peer] = &types.GetReplicationMessagesRequest{ ClusterName: request.ClusterName, } } req := requestsByPeer[peer] req.Tokens = append(req.Tokens, token) } g := &errgroup.Group{} var responseMutex sync.Mutex peerResponses := make([]*getReplicationMessagesWithSize, 0, len(requestsByPeer)) for peer, req := range requestsByPeer { peer, req := peer, req g.Go(func() (e error) { defer func() { log.CapturePanic(recover(), c.logger, &e) }() requestContext, cancel := common.CreateChildContext(ctx, 0.05) defer cancel() requestContext, responseInfo := rpc.ContextWithResponseInfo(requestContext) resp, err := c.client.GetReplicationMessages(requestContext, req, append(opts, yarpc.WithShardKey(peer))...) if err != nil { c.logger.Warn("Failed to get replication tasks from client", tag.Error(err), tag.ShardReplicationToken(req), ) // Returns service busy error to notify replication if _, ok := err.(*types.ServiceBusyError); ok { return err } return nil } responseMutex.Lock() peerResponses = append(peerResponses, &getReplicationMessagesWithSize{ response: resp, size: responseInfo.Size, peer: peer, }) responseMutex.Unlock() return nil }) } if err := g.Wait(); err != nil { return nil, err } // Peers with largest responses can be slowest to return data. // They end up in the end of array and have a possibility of not fitting in the response message. // Skipped peers grow their responses even more and next they will be even slower and end up in the end again. // This can lead to starving peers. // Shuffle the slice of responses to prevent such scenario. All peer will have equal chance to be pick up first. r := rand.New(rand.NewSource(time.Now().UnixNano())) for i := range peerResponses { j := r.Intn(i + 1) peerResponses[i], peerResponses[j] = peerResponses[j], peerResponses[i] } response := &types.GetReplicationMessagesResponse{MessagesByShard: make(map[int32]*types.ReplicationMessages)} responseTotalSize := 0 rpcMaxResponseSize := c.rpcMaxSizeInBytes for _, resp := range peerResponses { if (responseTotalSize + resp.size) >= rpcMaxResponseSize { // Log shards that did not fit for debugging purposes for shardID := range resp.response.GetMessagesByShard() { c.logger.Warn("Replication messages did not fit in the response", tag.ShardID(int(shardID)), tag.Address(resp.peer), tag.ResponseSize(resp.size), tag.ResponseTotalSize(responseTotalSize), tag.ResponseMaxSize(rpcMaxResponseSize), ) } // return partial response if the response size exceeded supported max size // but continue with next peer response, as it may still fit continue } responseTotalSize += resp.size for shardID, tasks := range resp.response.GetMessagesByShard() { response.MessagesByShard[shardID] = tasks } } return response, nil } func (c *clientImpl) GetDLQReplicationMessages( ctx context.Context, request *types.GetDLQReplicationMessagesRequest, opts ...yarpc.CallOption, ) (*types.GetDLQReplicationMessagesResponse, error) { // All workflow IDs are in the same shard per request workflowID := request.GetTaskInfos()[0].GetWorkflowID() peer, err := c.peerResolver.FromWorkflowID(workflowID) if err != nil { return nil, err } return c.client.GetDLQReplicationMessages( ctx, request, append(opts, yarpc.WithShardKey(peer))..., ) } func (c *clientImpl) ReapplyEvents( ctx context.Context, request *types.HistoryReapplyEventsRequest, opts ...yarpc.CallOption, ) error { peer, err := c.peerResolver.FromWorkflowID(request.GetRequest().GetWorkflowExecution().GetWorkflowID()) if err != nil { return err } op := func(ctx context.Context, peer string) error { return c.client.ReapplyEvents(ctx, request, append(opts, yarpc.WithShardKey(peer))...) } err = c.executeWithRedirect(ctx, peer, op) return err } func (c *clientImpl) CountDLQMessages( ctx context.Context, request *types.CountDLQMessagesRequest, opts ...yarpc.CallOption, ) (*types.HistoryCountDLQMessagesResponse, error) { peers, err := c.peerResolver.GetAllPeers() if err != nil { return nil, err } var mu sync.Mutex responses := make([]*types.HistoryCountDLQMessagesResponse, 0, len(peers)) g := &errgroup.Group{} for _, peer := range peers { peer := peer g.Go(func() (e error) { defer func() { log.CapturePanic(recover(), c.logger, &e) }() response, err := c.client.CountDLQMessages(ctx, request, append(opts, yarpc.WithShardKey(peer))...) if err == nil { mu.Lock() responses = append(responses, response) mu.Unlock() } return err }) } err = g.Wait() entries := map[types.HistoryDLQCountKey]int64{} for _, response := range responses { for key, count := range response.Entries { entries[key] = count } } return &types.HistoryCountDLQMessagesResponse{Entries: entries}, err } func (c *clientImpl) ReadDLQMessages( ctx context.Context, request *types.ReadDLQMessagesRequest, opts ...yarpc.CallOption, ) (*types.ReadDLQMessagesResponse, error) { peer, err := c.peerResolver.FromShardID(int(request.GetShardID())) if err != nil { return nil, err } return c.client.ReadDLQMessages(ctx, request, append(opts, yarpc.WithShardKey(peer))...) } func (c *clientImpl) PurgeDLQMessages( ctx context.Context, request *types.PurgeDLQMessagesRequest, opts ...yarpc.CallOption, ) error { peer, err := c.peerResolver.FromShardID(int(request.GetShardID())) if err != nil { return err } return c.client.PurgeDLQMessages(ctx, request, append(opts, yarpc.WithShardKey(peer))...) } func (c *clientImpl) MergeDLQMessages( ctx context.Context, request *types.MergeDLQMessagesRequest, opts ...yarpc.CallOption, ) (*types.MergeDLQMessagesResponse, error) { peer, err := c.peerResolver.FromShardID(int(request.GetShardID())) if err != nil { return nil, err } return c.client.MergeDLQMessages(ctx, request, append(opts, yarpc.WithShardKey(peer))...) } func (c *clientImpl) RefreshWorkflowTasks( ctx context.Context, request *types.HistoryRefreshWorkflowTasksRequest, opts ...yarpc.CallOption, ) error { peer, err := c.peerResolver.FromWorkflowID(request.GetRequest().GetExecution().GetWorkflowID()) if err != nil { return err } op := func(ctx context.Context, peer string) error { return c.client.RefreshWorkflowTasks(ctx, request, append(opts, yarpc.WithShardKey(peer))...) } err = c.executeWithRedirect(ctx, peer, op) return err } func (c *clientImpl) NotifyFailoverMarkers( ctx context.Context, request *types.NotifyFailoverMarkersRequest, opts ...yarpc.CallOption, ) error { requestsByPeer := make(map[string]*types.NotifyFailoverMarkersRequest) for _, token := range request.GetFailoverMarkerTokens() { marker := token.GetFailoverMarker() peer, err := c.peerResolver.FromDomainID(marker.GetDomainID()) if err != nil { return err } if _, ok := requestsByPeer[peer]; !ok { requestsByPeer[peer] = &types.NotifyFailoverMarkersRequest{ FailoverMarkerTokens: []*types.FailoverMarkerToken{}, } } req := requestsByPeer[peer] req.FailoverMarkerTokens = append(req.FailoverMarkerTokens, token) } g := &errgroup.Group{} for peer, req := range requestsByPeer { peer, req := peer, req g.Go(func() (e error) { defer func() { log.CapturePanic(recover(), c.logger, &e) }() return c.client.NotifyFailoverMarkers(ctx, req, append(opts, yarpc.WithShardKey(peer))...) }) } return g.Wait() } func (c *clientImpl) GetCrossClusterTasks( ctx context.Context, request *types.GetCrossClusterTasksRequest, opts ...yarpc.CallOption, ) (*types.GetCrossClusterTasksResponse, error) { requestByPeer := make(map[string]*types.GetCrossClusterTasksRequest) for _, shardID := range request.GetShardIDs() { peer, err := c.peerResolver.FromShardID(int(shardID)) if err != nil { return nil, err } if _, ok := requestByPeer[peer]; !ok { requestByPeer[peer] = &types.GetCrossClusterTasksRequest{ TargetCluster: request.TargetCluster, } } requestByPeer[peer].ShardIDs = append(requestByPeer[peer].ShardIDs, shardID) } // preserve 5% timeout to return partial of the result if context is timing out ctx, cancel := common.CreateChildContext(ctx, 0.05) defer cancel() futureByPeer := make(map[string]future.Future, len(requestByPeer)) for peer, req := range requestByPeer { future, settable := future.NewFuture() go func(ctx context.Context, peer string, req *types.GetCrossClusterTasksRequest) { settable.Set(c.client.GetCrossClusterTasks(ctx, req, yarpc.WithShardKey(peer))) }(ctx, peer, req) futureByPeer[peer] = future } response := &types.GetCrossClusterTasksResponse{ TasksByShard: make(map[int32][]*types.CrossClusterTaskRequest), FailedCauseByShard: make(map[int32]types.GetTaskFailedCause), } for peer, future := range futureByPeer { var resp *types.GetCrossClusterTasksResponse if futureErr := future.Get(ctx, &resp); futureErr != nil { c.logger.Error("Failed to get cross cluster tasks", tag.Error(futureErr)) for _, failedShardID := range requestByPeer[peer].ShardIDs { response.FailedCauseByShard[failedShardID] = common.ConvertErrToGetTaskFailedCause(futureErr) } } else { for shardID, tasks := range resp.TasksByShard { response.TasksByShard[shardID] = tasks } for shardID, failedCause := range resp.FailedCauseByShard { response.FailedCauseByShard[shardID] = failedCause } } } // not using a waitGroup for created goroutines as once all futures are unblocked, // those goroutines will eventually be completed return response, nil } func (c *clientImpl) RespondCrossClusterTasksCompleted( ctx context.Context, request *types.RespondCrossClusterTasksCompletedRequest, opts ...yarpc.CallOption, ) (*types.RespondCrossClusterTasksCompletedResponse, error) { peer, err := c.peerResolver.FromShardID(int(request.GetShardID())) if err != nil { return nil, err } var response *types.RespondCrossClusterTasksCompletedResponse op := func(ctx context.Context, peer string) error { var err error response, err = c.client.RespondCrossClusterTasksCompleted(ctx, request, append(opts, yarpc.WithShardKey(peer))...) return err } err = c.executeWithRedirect(ctx, peer, op) if err != nil { return nil, err } return response, nil } func (c *clientImpl) GetFailoverInfo( ctx context.Context, request *types.GetFailoverInfoRequest, opts ...yarpc.CallOption, ) (*types.GetFailoverInfoResponse, error) { peer, err := c.peerResolver.FromDomainID(request.GetDomainID()) if err != nil { return nil, err } return c.client.GetFailoverInfo(ctx, request, append(opts, yarpc.WithShardKey(peer))...) } func (c *clientImpl) executeWithRedirect( ctx context.Context, peer string, op func(ctx context.Context, peer string) error, ) error { var err error if ctx == nil { ctx = context.Background() } redirectLoop: for { err = common.IsValidContext(ctx) if err != nil { break redirectLoop } err = op(ctx, peer) if err != nil { if s, ok := err.(*types.ShardOwnershipLostError); ok { // TODO: consider emitting a metric for number of redirects peer, err = c.peerResolver.FromHostAddress(s.GetOwner()) if err != nil { return err } continue redirectLoop } } break redirectLoop } return err }