router/router.go (125 lines of code) (raw):
// Copyright (c) 2015 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package router
import (
"sync"
"github.com/uber/ringpop-go"
"github.com/uber/ringpop-go/events"
"github.com/uber/ringpop-go/swim"
"github.com/uber/tchannel-go"
"github.com/uber/tchannel-go/thrift"
)
type router struct {
ringpop ringpop.Interface
factory ClientFactory
channel *tchannel.Channel
rw sync.RWMutex
clientCache map[string]cacheEntry
}
// A Router creates instances of TChannel Thrift Clients via the help of the
// ClientFactory
type Router interface {
// GetClient provides the caller with a client for a given key. At the same
// time it will inform the caller if the client is a remote client or a
// local client via the isRemote return value.
GetClient(key string) (client interface{}, isRemote bool, err error)
// GetNClients provides the caller with an ordered slice of clients for a
// given key. Each result is a struct with a reference to the actual client
// and a bool indicating whether or not that client is a remote client or a
// local client.
GetNClients(key string, n int) (clients []ClientResult, err error)
}
// A ClientFactory is able to provide an implementation of a TChan[Service]
// interface that can dispatch calls to the actual implementation. This could be
// both a local or a remote implementation of the interface based on the dest
// provided
type ClientFactory interface {
GetLocalClient() interface{}
MakeRemoteClient(client thrift.TChanClient) interface{}
}
type cacheEntry struct {
client interface{}
isRemote bool
}
// New creates an instance that validates the Router interface. A Router
// will be used to get implementations of service interfaces that implement a
// distributed microservice.
func New(rp ringpop.Interface, f ClientFactory, ch *tchannel.Channel) Router {
r := &router{
ringpop: rp,
factory: f,
channel: ch,
clientCache: make(map[string]cacheEntry),
}
rp.AddListener(r)
return r
}
func (r *router) HandleEvent(event events.Event) {
switch event := event.(type) {
case swim.MemberlistChangesReceivedEvent:
for _, change := range event.Changes {
r.handleChange(change)
}
}
}
func (r *router) handleChange(change swim.Change) {
switch change.Status {
case swim.Faulty, swim.Leave:
r.removeClient(change.Address)
}
}
// Get the client for a certain destination from our internal cache, or
// delegates the creation to the ClientFactory.
func (r *router) GetClient(key string) (client interface{}, isRemote bool, err error) {
dest, err := r.ringpop.Lookup(key)
if err != nil {
return nil, false, err
}
return r.getClientByHost(dest)
}
// ClientResult is a struct that contains a reference to the actual callable
// client and a bool indicating whether or not that client is local or remote.
type ClientResult struct {
HostPort string
Client interface{}
IsRemote bool
}
func (r *router) GetNClients(key string, n int) ([]ClientResult, error) {
dests, err := r.ringpop.LookupN(key, n)
if err != nil {
return nil, err
}
clients := make([]ClientResult, n, n)
for i, dest := range dests {
client, isRemote, err := r.getClientByHost(dest)
if err != nil {
return nil, err
}
clients[i] = ClientResult{dest, client, isRemote}
}
return clients, nil
}
func (r *router) getClientByHost(dest string) (client interface{}, isRemote bool, err error) {
r.rw.RLock()
cachedEntry, ok := r.clientCache[dest]
r.rw.RUnlock()
if ok {
client = cachedEntry.client
isRemote = cachedEntry.isRemote
return client, isRemote, nil
}
// no match so far, get a complete lock for creation
r.rw.Lock()
defer r.rw.Unlock()
// double check it is not created between read and complete lock
cachedEntry, ok = r.clientCache[dest]
if ok {
client = cachedEntry.client
isRemote = cachedEntry.isRemote
return client, isRemote, nil
}
me, err := r.ringpop.WhoAmI()
if err != nil {
return nil, false, err
}
// use the ClientFactory to get the client
if dest == me {
isRemote = false
client = r.factory.GetLocalClient()
} else {
isRemote = true
thriftClient := thrift.NewClient(
r.channel,
r.channel.ServiceName(),
&thrift.ClientOptions{
HostPort: dest,
},
)
client = r.factory.MakeRemoteClient(thriftClient)
}
// cache the client
r.clientCache[dest] = cacheEntry{
client: client,
isRemote: isRemote,
}
return client, isRemote, nil
}
func (r *router) removeClient(hostport string) {
r.rw.Lock()
delete(r.clientCache, hostport)
r.rw.Unlock()
}