testutils/mockhyperbahn/hyperbahn.go (104 lines of code) (raw):
// Copyright (c) 2015 Uber Technologies, Inc.
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package mockhyperbahn
import (
"errors"
"fmt"
"sync"
"github.com/uber/tchannel-go"
"github.com/uber/tchannel-go/hyperbahn"
hthrift "github.com/uber/tchannel-go/hyperbahn/gen-go/hyperbahn"
"github.com/uber/tchannel-go/json"
"github.com/uber/tchannel-go/relay/relaytest"
"github.com/uber/tchannel-go/thrift"
)
// Mock is up a mock Hyperbahn server for tests.
type Mock struct {
sync.RWMutex
ch *tchannel.Channel
respCh chan int
advertised []string
discoverResults map[string][]string
}
// New returns a mock Hyperbahn server that can be used for testing.
func New() (*Mock, error) {
stubHost := relaytest.NewStubRelayHost()
ch, err := tchannel.NewChannel("hyperbahn", &tchannel.ChannelOptions{
RelayHost: stubHost,
RelayLocalHandlers: []string{"hyperbahn"},
})
if err != nil {
return nil, err
}
mh := &Mock{
ch: ch,
respCh: make(chan int),
discoverResults: make(map[string][]string),
}
if err := json.Register(ch, json.Handlers{"ad": mh.adHandler}, nil); err != nil {
return nil, err
}
thriftServer := thrift.NewServer(ch)
thriftServer.Register(hthrift.NewTChanHyperbahnServer(mh))
return mh, ch.ListenAndServe("127.0.0.1:0")
}
// SetDiscoverResult sets the given hostPorts as results for the Discover call.
func (h *Mock) SetDiscoverResult(serviceName string, hostPorts []string) {
h.Lock()
defer h.Unlock()
h.discoverResults[serviceName] = hostPorts
}
// Discover returns the IPs for a discovery query if some were set using SetDiscoverResult.
// Otherwise, it returns an error.
func (h *Mock) Discover(ctx thrift.Context, query *hthrift.DiscoveryQuery) (*hthrift.DiscoveryResult_, error) {
h.RLock()
defer h.RUnlock()
hostPorts, ok := h.discoverResults[query.ServiceName]
if !ok {
return nil, fmt.Errorf("no discovery results set for %v", query.ServiceName)
}
peers, err := toServicePeers(hostPorts)
if err != nil {
return nil, fmt.Errorf("invalid discover result set: %v", err)
}
return &hthrift.DiscoveryResult_{
Peers: peers,
}, nil
}
// Configuration returns a hyperbahn.Configuration object used to configure a
// hyperbahn.Client to talk to this mock server.
func (h *Mock) Configuration() hyperbahn.Configuration {
return hyperbahn.Configuration{
InitialNodes: []string{h.ch.PeerInfo().HostPort},
}
}
// Channel returns the underlying tchannel that implements relaying.
func (h *Mock) Channel() *tchannel.Channel {
return h.ch
}
func (h *Mock) adHandler(ctx json.Context, req *hyperbahn.AdRequest) (*hyperbahn.AdResponse, error) {
callerHostPort := tchannel.CurrentCall(ctx).RemotePeer().HostPort
h.Lock()
for _, s := range req.Services {
h.advertised = append(h.advertised, s.Name)
sc := h.ch.GetSubChannel(s.Name, tchannel.Isolated)
sc.Peers().Add(callerHostPort)
}
h.Unlock()
select {
case n := <-h.respCh:
if n == 0 {
return nil, errors.New("error")
}
return &hyperbahn.AdResponse{ConnectionCount: n}, nil
default:
// Return a default response
return &hyperbahn.AdResponse{ConnectionCount: 3}, nil
}
}
// GetAdvertised returns the list of services registered.
func (h *Mock) GetAdvertised() []string {
h.RLock()
defer h.RUnlock()
return h.advertised
}
// Close stops the mock Hyperbahn server.
func (h *Mock) Close() {
h.ch.Close()
}
// QueueError queues an error to be returned on the next advertise call.
func (h *Mock) QueueError() {
h.respCh <- 0
}
// QueueResponse queues a response from Hyperbahn.
// numConnections must be greater than 0.
func (h *Mock) QueueResponse(numConnections int) {
if numConnections <= 0 {
panic("QueueResponse must have numConnections > 0")
}
h.respCh <- numConnections
}