hyperbahn/client.go (134 lines of code) (raw):

// Copyright (c) 2015 Uber Technologies, Inc. // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package hyperbahn import ( "encoding/json" "fmt" "net" "os" "time" "github.com/uber/tchannel-go" htypes "github.com/uber/tchannel-go/hyperbahn/gen-go/hyperbahn" tjson "github.com/uber/tchannel-go/json" tthrift "github.com/uber/tchannel-go/thrift" ) // Client manages Hyperbahn connections and registrations. type Client struct { tchan *tchannel.Channel services []string opts ClientOptions quit chan struct{} jsonClient *tjson.Client hyperbahnClient htypes.TChanHyperbahn } // FailStrategy is the strategy to use when registration fails maxRegistrationFailures // times consecutively in the background. This is not used if the initial registration fails. type FailStrategy int const ( // FailStrategyFatal will call Fatalf on the channel's logger after triggering handler.OnError. // This is the default strategy. FailStrategyFatal FailStrategy = iota // FailStrategyIgnore will only call handler.OnError, even after many // errors, and will continue to retry forever. FailStrategyIgnore ) const hyperbahnServiceName = "hyperbahn" // UnmarshalText implements encoding/text.Unmarshaler. // This allows FailStrategy to be specified as a string in many // file formats (e.g. JSON, YAML, TOML). func (f *FailStrategy) UnmarshalText(text []byte) error { switch strategy := string(text); strategy { case "", "fatal": *f = FailStrategyFatal case "ignore": *f = FailStrategyIgnore default: return fmt.Errorf("not a valid fail strategy: %q", strategy) } return nil } // ClientOptions are used to configure this Hyperbahn client. type ClientOptions struct { // Timeout defaults to 3 seconds if it is not set. Timeout time.Duration // TimeoutPerAttempt defaults to 1 second if it is not set. TimeoutPerAttempt time.Duration Handler Handler FailStrategy FailStrategy // The following are variables for stubbing in unit tests. // They are not part of the stable API and may change. TimeSleep func(d time.Duration) } // NewClient creates a new Hyperbahn client using the given channel. // config is the environment-specific configuration for Hyperbahn such as the list of initial nodes. // opts are optional, and are used to customize the client. func NewClient(ch *tchannel.Channel, config Configuration, opts *ClientOptions) (*Client, error) { client := &Client{tchan: ch, quit: make(chan struct{})} if opts != nil { client.opts = *opts } if client.opts.Timeout == 0 { client.opts.Timeout = 3 * time.Second } if client.opts.TimeoutPerAttempt == 0 { client.opts.TimeoutPerAttempt = time.Second } if client.opts.Handler == nil { client.opts.Handler = nullHandler{} } if client.opts.TimeSleep == nil { client.opts.TimeSleep = func(d time.Duration) { select { case <-time.After(d): return case <-client.quit: return } } } if err := parseConfig(&config); err != nil { return nil, err } // Add the given initial nodes as peers. for _, node := range config.InitialNodes { addPeer(ch, node) } client.jsonClient = tjson.NewClient(ch, hyperbahnServiceName, nil) thriftClient := tthrift.NewClient(ch, hyperbahnServiceName, nil) client.hyperbahnClient = htypes.NewTChanHyperbahnClient(thriftClient) return client, nil } // parseConfig parses the configuration options (e.g. InitialNodesFile) func parseConfig(config *Configuration) error { if config.InitialNodesFile != "" { f, err := os.Open(config.InitialNodesFile) if err != nil { return err } defer f.Close() decoder := json.NewDecoder(f) if err := decoder.Decode(&config.InitialNodes); err != nil { return err } } if len(config.InitialNodes) == 0 { return fmt.Errorf("hyperbahn Client requires at least one initial node") } for _, node := range config.InitialNodes { if _, _, err := net.SplitHostPort(node); err != nil { return fmt.Errorf("hyperbahn Client got invalid node %v: %v", node, err) } } return nil } // addPeer adds a peer to the Hyperbahn subchannel. // TODO(prashant): Start connections to the peers in the background. func addPeer(ch *tchannel.Channel, hostPort string) { peers := ch.GetSubChannel(hyperbahnServiceName).Peers() peers.Add(hostPort) } func (c *Client) getServiceNames(otherServices []tchannel.Registrar) { c.services = make([]string, 0, len(otherServices)+1) c.services = append(c.services, c.tchan.PeerInfo().ServiceName) for _, s := range otherServices { c.services = append(c.services, s.ServiceName()) } } // Advertise advertises the service with Hyperbahn, and returns any errors on initial advertisement. // Advertise can register multiple services hosted on the same endpoint. // If the advertisement succeeds, a goroutine is started to re-advertise periodically. func (c *Client) Advertise(otherServices ...tchannel.Registrar) error { c.getServiceNames(otherServices) if err := c.initialAdvertise(); err != nil { return err } c.opts.Handler.On(Advertised) go c.advertiseLoop() return nil } // IsClosed returns whether this Client is closed. func (c *Client) IsClosed() bool { select { case <-c.quit: return true default: return false } } // Close closes the Hyperbahn client, which stops any background re-advertisements. func (c *Client) Close() { if !c.IsClosed() { close(c.quit) } }