ringpop.go (530 lines of code) (raw):

// Copyright (c) 2015 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. // Package ringpop is a library that maintains a consistent hash ring atop a // gossip-based membership protocol. It can be used by applications to // arbitrarily shard data in a scalable and fault-tolerant manner. package ringpop import ( "bytes" "errors" "fmt" "sync" "time" "github.com/uber/ringpop-go/events" "github.com/uber/ringpop-go/forward" "github.com/uber/ringpop-go/hashring" "github.com/uber/ringpop-go/logging" "github.com/uber/ringpop-go/membership" "github.com/uber/ringpop-go/shared" "github.com/uber/ringpop-go/swim" "github.com/benbjohnson/clock" "github.com/dgryski/go-farm" log "github.com/uber-common/bark" "github.com/uber/tchannel-go" athrift "github.com/uber/tchannel-go/thirdparty/github.com/apache/thrift/lib/go/thrift" ) // Interface specifies the public facing methods a user of ringpop is able to // use. type Interface interface { Destroy() App() string WhoAmI() (string, error) Uptime() (time.Duration, error) Bootstrap(opts *swim.BootstrapOptions) ([]string, error) Checksum() (uint32, error) Lookup(key string) (string, error) LookupN(key string, n int) ([]string, error) GetReachableMembers(predicates ...swim.MemberPredicate) ([]string, error) CountReachableMembers(predicates ...swim.MemberPredicate) (int, error) HandleOrForward(key string, request []byte, response *[]byte, service, endpoint string, format tchannel.Format, opts *forward.Options) (bool, error) Forward(dest string, keys []string, request []byte, service, endpoint string, format tchannel.Format, opts *forward.Options) ([]byte, error) Labels() (*swim.NodeLabels, error) // events.EventRegistar // mockery has troubles generating a working mock when the interface is // embedded therefore the definitions are copied here. AddListener(events.EventListener) bool RemoveListener(events.EventListener) bool // DEPRECATED, use AddListener (!) kept around for backwards compatibility // but will start logging warnings RegisterListener(events.EventListener) // swim.SelfEvict // mockery has troubles generating a working mock when the interface is // embedded therefore the definitions are copied here. RegisterSelfEvictHook(hooks swim.SelfEvictHook) error SelfEvict() error } // Ringpop is a consistent hashring that uses a gossip protocol to disseminate // changes around the ring. type Ringpop struct { // make ringpop an event Emitter events.AsyncEventEmitter config *configuration configHashRing *hashring.Configuration addressResolver AddressResolver state state stateMutex sync.RWMutex clock clock.Clock channel shared.TChannel subChannel shared.SubChannel node swim.NodeInterface ring *hashring.HashRing forwarder *forward.Forwarder statter log.StatsReporter stats struct { hostport string prefix string keys map[string]string hooks []string sync.RWMutex } logger log.Logger tickers chan *clock.Ticker startTime time.Time } // state represents the internal state of a Ringpop instance. type state uint const ( // created means the Ringpop instance has been created but the swim node, // stats and hashring haven't been set up. The listen address has not been // resolved yet either. created state = iota // initialized means the listen address has been resolved and the swim // node, stats and hashring have been instantiated onto the Ringpop // instance. initialized // ready means Bootstrap has been called, the ring has successfully // bootstrapped and is now ready to receive requests. ready // destroyed means the Ringpop instance has been shut down, is no longer // ready for requests and cannot be revived. destroyed ) // New returns a new Ringpop instance. func New(app string, opts ...Option) (*Ringpop, error) { var err error ringpop := &Ringpop{ config: &configuration{ App: app, InitialLabels: make(swim.LabelMap), }, logger: logging.Logger("ringpop"), } err = applyOptions(ringpop, defaultOptions) if err != nil { panic(fmt.Errorf("Error applying default Ringpop options: %v", err)) } err = applyOptions(ringpop, opts) if err != nil { return nil, err } errs := checkOptions(ringpop) if len(errs) != 0 { return nil, fmt.Errorf("%v", errs) } ringpop.setState(created) return ringpop, nil } // init configures a Ringpop instance and makes it ready to do comms. func (rp *Ringpop) init() error { if rp.channel == nil { return errors.New("Missing channel") } address, err := rp.address() if err != nil { return err } // early initialization of statter before registering listeners that might // fire and try to stat rp.stats.hostport = genStatsHostport(address) rp.stats.prefix = fmt.Sprintf("ringpop.%s", rp.stats.hostport) rp.stats.keys = make(map[string]string) rp.subChannel = rp.channel.GetSubChannel("ringpop", tchannel.Isolated) rp.registerHandlers() if rp.node == nil { rp.node = swim.NewNode(rp.config.App, address, rp.subChannel, &swim.Options{ StateTimeouts: rp.config.StateTimeouts, Clock: rp.clock, LabelLimits: rp.config.LabelLimits, InitialLabels: rp.config.InitialLabels, SelfEvict: rp.config.SelfEvict, RequiresAppInPing: rp.config.RequiresAppInPing, }) } rp.node.AddListener(rp) rp.ring = hashring.New(farm.Fingerprint32, rp.configHashRing.ReplicaPoints) rp.ring.AddListener(rp) // add all members present in the membership of the node on startup. for _, member := range rp.node.GetReachableMembers() { rp.ring.AddMembers(member) } rp.forwarder = forward.NewForwarder(rp, rp.subChannel) rp.forwarder.AddListener(rp) rp.startTimers() rp.setState(initialized) return nil } // Starts periodic timers in a single goroutine. Can be turned back off via // stopTimers. func (rp *Ringpop) startTimers() { if rp.tickers != nil { return } rp.tickers = make(chan *clock.Ticker, 32) // 32 == max number of tickers if rp.config.MembershipChecksumStatPeriod != StatPeriodNever { ticker := rp.clock.Ticker(rp.config.MembershipChecksumStatPeriod) rp.tickers <- ticker go func() { for range ticker.C { rp.statter.UpdateGauge( rp.getStatKey("membership.checksum-periodic"), nil, int64(rp.node.GetChecksum()), ) } }() } if rp.config.RingChecksumStatPeriod != StatPeriodNever { ticker := rp.clock.Ticker(rp.config.RingChecksumStatPeriod) rp.tickers <- ticker go func() { for range ticker.C { rp.statter.UpdateGauge( rp.getStatKey("ring.checksum-periodic"), nil, int64(rp.ring.Checksum()), ) // emit all named checksums as well for name, checksum := range rp.ring.Checksums() { rp.statter.UpdateGauge( rp.getStatKey("ring.checksums-periodic."+name), nil, int64(checksum), ) } } }() } } func (rp *Ringpop) stopTimers() { if rp.tickers != nil { close(rp.tickers) for ticker := range rp.tickers { ticker.Stop() } rp.tickers = nil } } // address returns a host:port string of the address that Ringpop should // use as its address. func (rp *Ringpop) address() (string, error) { return rp.addressResolver() } // r.channelAddressResolver resolves the hostport from the current // TChannel object on the Ringpop instance. func (rp *Ringpop) channelAddressResolver() (string, error) { peerInfo := rp.channel.PeerInfo() // Check that TChannel is listening on a real hostport. By default, // TChannel listens on an ephemeral host/port. The real port is then // assigned by the OS when ListenAndServe is called. If the hostport is // ephemeral, it means TChannel is not yet listening and the hostport // cannot be resolved. if peerInfo.IsEphemeralHostPort() { return "", ErrEphemeralAddress } return peerInfo.HostPort, nil } // Destroy stops all communication. Note that this does not close the TChannel // instance that was passed to Ringpop in the constructor. Once an instance is // destroyed, it cannot be restarted. func (rp *Ringpop) Destroy() { if rp.node != nil { rp.node.Destroy() } rp.stopTimers() rp.setState(destroyed) } // destroyed returns func (rp *Ringpop) destroyed() bool { return rp.getState() == destroyed } // App returns the name of the application this Ringpop instance belongs to. // The application name is set in the constructor when the Ringpop instance is // created. func (rp *Ringpop) App() string { return rp.config.App } // WhoAmI returns the address of the current/local Ringpop node. It returns an // error if Ringpop is not yet initialized/bootstrapped. func (rp *Ringpop) WhoAmI() (string, error) { if !rp.Ready() { return "", ErrNotBootstrapped } return rp.address() } // Uptime returns the amount of time that this Ringpop instance has been // bootstrapped for. func (rp *Ringpop) Uptime() (time.Duration, error) { if !rp.Ready() { return 0, ErrNotBootstrapped } return time.Now().Sub(rp.startTime), nil } // RegisterListener is DEPRECATED, use AddListener. This function is kept around // for the time being to make sure that ringpop is a drop in replacement for // now. It should not be used by new projects, to accomplish this it will log a // warning message that the developer can understand. A release in the future // will remove this function completely which will cause a breaking change to // the ringpop public interface. func (rp *Ringpop) RegisterListener(l events.EventListener) { rp.logger.Warn("RegisterListener is deprecated, use AddListener") rp.AddListener(l) } // getState gets the state of the current Ringpop instance. func (rp *Ringpop) getState() state { rp.stateMutex.RLock() r := rp.state rp.stateMutex.RUnlock() return r } // setState sets the state of the current Ringpop instance. It will emit an appropriate // event when the state will actually change func (rp *Ringpop) setState(s state) { rp.stateMutex.Lock() oldState := rp.state rp.state = s rp.stateMutex.Unlock() // test if the state has changed with this call to setState if oldState != s { switch s { case ready: rp.EmitEvent(events.Ready{}) case destroyed: rp.EmitEvent(events.Destroyed{}) } } } // RegisterSelfEvictHook registers the eviction hooks that need to be executed // before and after self eviction from the membership. An error will be returned // if ringpop was unable to register the hooks. This could happen in the following // cases: // - Ringpop has not been bootstrapped yet // - SelfEvict has already been called // - The hook is already registered func (rp *Ringpop) RegisterSelfEvictHook(hooks swim.SelfEvictHook) error { if !rp.Ready() { return ErrNotBootstrapped } return rp.node.RegisterSelfEvictHook(hooks) } // SelfEvict should be called before shutting down the application. When calling // this function ringpop will gracefully evict itself from the network. Utilities // that hook into ringpop will have the opportunity to hook into this system to // gracefully handle the shutdown of ringpop. func (rp *Ringpop) SelfEvict() error { if !rp.Ready() { return ErrNotBootstrapped } return rp.node.SelfEvict() } //= = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = // // Bootstrap // //= = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = // Bootstrap starts communication for this Ringpop instance. // // When Bootstrap is called, this Ringpop instance will attempt to contact // other instances from the DiscoverProvider. // // If no seed hosts are provided, a single-node cluster will be created. func (rp *Ringpop) Bootstrap(bootstrapOpts *swim.BootstrapOptions) ([]string, error) { if rp.getState() < initialized { err := rp.init() if err != nil { return nil, err } } // We shouldn't try to bootstrap if the channel is not listening if rp.channel.State() != tchannel.ChannelListening { rp.logger.WithField("channelState", rp.channel.State()).Error(ErrChannelNotListening.Error()) return nil, ErrChannelNotListening } joined, err := rp.node.Bootstrap(bootstrapOpts) if err != nil { rp.logger.WithField("error", err).Info("bootstrap failed") rp.setState(initialized) return nil, err } rp.setState(ready) rp.logger.WithField("joined", joined).Info("bootstrap complete") return joined, nil } // Ready returns whether or not ringpop is bootstrapped and ready to receive // requests. func (rp *Ringpop) Ready() bool { if rp.getState() != ready { return false } return rp.node.Ready() } //= = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = // // SWIM Events // //= = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = // HandleEvent is used to satisfy the swim.EventListener interface. No touchy. func (rp *Ringpop) HandleEvent(event events.Event) { rp.EmitEvent(event) switch event := event.(type) { case swim.MemberlistChangesReceivedEvent: for _, change := range event.Changes { status := change.Status if len(status) == 0 { status = "unknown" } rp.statter.IncCounter(rp.getStatKey("membership-update."+status), nil, 1) } case membership.ChangeEvent: rp.ring.ProcessMembershipChanges(event.Changes) case swim.MemberlistChangesAppliedEvent: rp.statter.UpdateGauge(rp.getStatKey("changes.apply"), nil, int64(len(event.Changes))) for _, change := range event.Changes { status := change.Status if len(status) == 0 { status = "unknown" } rp.statter.IncCounter(rp.getStatKey("membership-set."+status), nil, 1) } // During bootstrapping ringpop is not ready causing errors from // CountReachableMembers. When this is logged it would confuse people // thinking that there is a problem with ringpop bootstrapping if rp.Ready() { mc, err := rp.CountReachableMembers() if err != nil { rp.logger.Errorf("unable to count members of the ring for statting: %q", err) } else { rp.statter.UpdateGauge(rp.getStatKey("num-members"), nil, int64(mc)) } } rp.statter.IncCounter(rp.getStatKey("updates"), nil, int64(len(event.Changes))) case swim.FullSyncEvent: rp.statter.IncCounter(rp.getStatKey("full-sync"), nil, 1) case swim.StartReverseFullSyncEvent: rp.statter.IncCounter(rp.getStatKey("full-sync.reverse"), nil, 1) case swim.OmitReverseFullSyncEvent: rp.statter.IncCounter(rp.getStatKey("full-sync.reverse-omitted"), nil, 1) case swim.RedundantReverseFullSyncEvent: rp.statter.IncCounter(rp.getStatKey("full-sync.redundant-reverse"), nil, 1) case swim.MaxPAdjustedEvent: rp.statter.UpdateGauge(rp.getStatKey("max-piggyback"), nil, int64(event.NewPCount)) case swim.JoinReceiveEvent: rp.statter.IncCounter(rp.getStatKey("join.recv"), nil, 1) case swim.JoinCompleteEvent: rp.statter.IncCounter(rp.getStatKey("join.complete"), nil, 1) rp.statter.IncCounter(rp.getStatKey("join.succeeded"), nil, 1) rp.statter.RecordTimer(rp.getStatKey("join"), nil, event.Duration) case swim.AddJoinListEvent: rp.statter.RecordTimer(rp.getStatKey("join.add-join-list"), nil, event.Duration) case swim.PingSendEvent: rp.statter.IncCounter(rp.getStatKey("ping.send"), nil, 1) case swim.PingSendCompleteEvent: rp.statter.RecordTimer(rp.getStatKey("ping"), nil, event.Duration) case swim.PingReceiveEvent: rp.statter.IncCounter(rp.getStatKey("ping.recv"), nil, 1) case swim.PingRequestsSendEvent: rp.statter.IncCounter(rp.getStatKey("ping-req.send"), nil, 1) rp.statter.IncCounter(rp.getStatKey("ping-req.other-members"), nil, int64(len(event.Peers))) case swim.PingRequestsSendCompleteEvent: rp.statter.RecordTimer(rp.getStatKey("ping-req"), nil, event.Duration) case swim.PingRequestReceiveEvent: rp.statter.IncCounter(rp.getStatKey("ping-req.recv"), nil, 1) case swim.PingRequestPingEvent: rp.statter.RecordTimer(rp.getStatKey("ping-req-ping"), nil, event.Duration) case swim.ProtocolDelayComputeEvent: rp.statter.RecordTimer(rp.getStatKey("protocol.delay"), nil, event.Duration) case swim.ProtocolFrequencyEvent: rp.statter.RecordTimer(rp.getStatKey("protocol.frequency"), nil, event.Duration) case swim.ChecksumComputeEvent: rp.statter.RecordTimer(rp.getStatKey("compute-checksum"), nil, event.Duration) rp.statter.UpdateGauge(rp.getStatKey("checksum"), nil, int64(event.Checksum)) rp.statter.IncCounter(rp.getStatKey("membership.checksum-computed"), nil, 1) case swim.ChangesCalculatedEvent: rp.statter.UpdateGauge(rp.getStatKey("changes.disseminate"), nil, int64(len(event.Changes))) case swim.ChangeFilteredEvent: rp.statter.IncCounter(rp.getStatKey("filtered-change"), nil, 1) case swim.JoinFailedEvent: rp.statter.IncCounter(rp.getStatKey("join.failed."+string(event.Reason)), nil, 1) case swim.JoinTriesUpdateEvent: rp.statter.UpdateGauge(rp.getStatKey("join.retries"), nil, int64(event.Retries)) case swim.MakeNodeStatusEvent: rp.statter.IncCounter(rp.getStatKey("make-"+event.Status), nil, 1) case swim.RequestBeforeReadyEvent: rp.statter.IncCounter(rp.getStatKey("not-ready."+string(event.Endpoint)), nil, 1) case swim.DiscoHealEvent: rp.statter.IncCounter(rp.getStatKey("heal.triggered"), nil, 1) case swim.AttemptHealEvent: rp.statter.IncCounter(rp.getStatKey("heal.attempt"), nil, 1) case swim.RefuteUpdateEvent: rp.statter.IncCounter(rp.getStatKey("refuted-update"), nil, 1) case swim.SelfEvictedEvent: rp.statter.RecordTimer(rp.getStatKey("self-eviction"), nil, event.Duration) case events.RingChecksumEvent: rp.statter.IncCounter(rp.getStatKey("ring.checksum-computed"), nil, 1) rp.statter.UpdateGauge(rp.getStatKey("ring.checksum"), nil, int64((event.NewChecksum))) for key, value := range event.NewChecksums { rp.statter.UpdateGauge(rp.getStatKey("ring.checksums."+key), nil, int64(value)) } case events.RingChangedEvent: added := int64(len(event.ServersAdded)) removed := int64(len(event.ServersRemoved)) rp.statter.IncCounter(rp.getStatKey("ring.server-added"), nil, added) rp.statter.IncCounter(rp.getStatKey("ring.server-removed"), nil, removed) rp.statter.IncCounter(rp.getStatKey("ring.changed"), nil, 1) case forward.RequestForwardedEvent: rp.statter.IncCounter(rp.getStatKey("requestProxy.egress"), nil, 1) case forward.InflightRequestsChangedEvent: rp.statter.UpdateGauge(rp.getStatKey("requestProxy.inflight"), nil, event.Inflight) case forward.InflightRequestsMiscountEvent: rp.statter.IncCounter(rp.getStatKey("requestProxy.miscount."+string(event.Operation)), nil, 1) case forward.FailedEvent: rp.statter.IncCounter(rp.getStatKey("requestProxy.send.error"), nil, 1) case forward.SuccessEvent: rp.statter.IncCounter(rp.getStatKey("requestProxy.send.success"), nil, 1) case forward.MaxRetriesEvent: rp.statter.IncCounter(rp.getStatKey("requestProxy.retry.failed"), nil, 1) case forward.RetryAttemptEvent: rp.statter.IncCounter(rp.getStatKey("requestProxy.retry.attempted"), nil, 1) case forward.RetryAbortEvent: rp.statter.IncCounter(rp.getStatKey("requestProxy.retry.aborted"), nil, 1) case forward.RerouteEvent: me, _ := rp.WhoAmI() if event.NewDestination == me { rp.statter.IncCounter(rp.getStatKey("requestProxy.retry.reroute.local"), nil, 1) } else { rp.statter.IncCounter(rp.getStatKey("requestProxy.retry.reroute.remote"), nil, 1) } case forward.RetrySuccessEvent: rp.statter.IncCounter(rp.getStatKey("requestProxy.retry.succeeded"), nil, 1) } } //= = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = // // Ring // //= = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = // Checksum returns the current checksum of this Ringpop instance's hashring. func (rp *Ringpop) Checksum() (uint32, error) { if !rp.Ready() { return 0, ErrNotBootstrapped } return rp.ring.Checksum(), nil } // Lookup returns the address of the server in the ring that is responsible // for the specified key. It returns an error if the Ringpop instance is not // yet initialized/bootstrapped. func (rp *Ringpop) Lookup(key string) (string, error) { if !rp.Ready() { return "", ErrNotBootstrapped } startTime := time.Now() dest, success := rp.ring.Lookup(key) duration := time.Now().Sub(startTime) rp.statter.RecordTimer(rp.getStatKey("lookup"), nil, duration) rp.EmitEvent(events.LookupEvent{ Key: key, Duration: duration, }) if !success { err := errors.New("could not find destination for key") rp.logger.WithField("key", key).Warn(err) return "", err } return dest, nil } // LookupN returns the addresses of all the servers in the ring that are // responsible for the specified key. It returns an error if the Ringpop // instance is not yet initialized/bootstrapped. func (rp *Ringpop) LookupN(key string, n int) ([]string, error) { if !rp.Ready() { return nil, ErrNotBootstrapped } startTime := time.Now() destinations := rp.ring.LookupN(key, n) duration := time.Now().Sub(startTime) rp.statter.RecordTimer(rp.getStatKey(fmt.Sprintf("lookupn.%d", n)), nil, duration) rp.EmitEvent(events.LookupNEvent{ Key: key, N: n, Duration: duration, }) if len(destinations) == 0 { err := errors.New("could not find destinations for key") rp.logger.WithField("key", key).Warn(err) return destinations, err } return destinations, nil } func (rp *Ringpop) ringEvent(e interface{}) { rp.HandleEvent(e) } // GetReachableMembers returns a slice of members currently in this instance's // active membership list that match all provided predicates. func (rp *Ringpop) GetReachableMembers(predicates ...swim.MemberPredicate) ([]string, error) { if !rp.Ready() { return nil, ErrNotBootstrapped } members := rp.node.GetReachableMembers(predicates...) addresses := make([]string, 0, len(members)) for _, member := range members { addresses = append(addresses, member.Address) } return addresses, nil } // CountReachableMembers returns the number of members currently in this // instance's active membership list that match all provided predicates. func (rp *Ringpop) CountReachableMembers(predicates ...swim.MemberPredicate) (int, error) { if !rp.Ready() { return 0, ErrNotBootstrapped } return rp.node.CountReachableMembers(predicates...), nil } //= = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = // // Stats // //= = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = func (rp *Ringpop) getStatKey(key string) string { rp.stats.Lock() rpKey, ok := rp.stats.keys[key] if !ok { rpKey = fmt.Sprintf("%s.%s", rp.stats.prefix, key) rp.stats.keys[key] = rpKey } rp.stats.Unlock() return rpKey } //= = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = // // Forwarding // //= = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = // HandleOrForward returns true if the request should be handled locally, or false // if it should be forwarded to a different node. If false is returned, forwarding // is taken care of internally by the method, and, if no error has occured, the // response is written in the provided response field. func (rp *Ringpop) HandleOrForward(key string, request []byte, response *[]byte, service, endpoint string, format tchannel.Format, opts *forward.Options) (bool, error) { if !rp.Ready() { return false, ErrNotBootstrapped } dest, err := rp.Lookup(key) if err != nil { return false, err } address, err := rp.WhoAmI() if err != nil { return false, err } if dest == address { return true, nil } res, err := rp.Forward(dest, []string{key}, request, service, endpoint, format, opts) *response = res return false, err } // Forward forwards the request to given destination host and returns the response. func (rp *Ringpop) Forward(dest string, keys []string, request []byte, service, endpoint string, format tchannel.Format, opts *forward.Options) ([]byte, error) { return rp.forwarder.ForwardRequest(request, dest, service, endpoint, keys, format, opts) } // Labels provides access to a mutator of ringpop Labels that will be shared on // the membership. Changes made on the mutator are synchronized accross the // cluster for other members to make local decisions on. func (rp *Ringpop) Labels() (*swim.NodeLabels, error) { if !rp.Ready() { return nil, ErrNotBootstrapped } return rp.node.Labels(), nil } // SerializeThrift takes a thrift struct and returns the serialized bytes // of that struct using the thrift binary protocol. This is a temporary // measure before frames can forwarded directly past the endpoint to the proper // destinaiton. func SerializeThrift(s athrift.TStruct) ([]byte, error) { var b []byte var buffer = bytes.NewBuffer(b) transport := athrift.NewStreamTransportW(buffer) if err := s.Write(athrift.NewTBinaryProtocolTransport(transport)); err != nil { return nil, err } if err := transport.Flush(); err != nil { return nil, err } return buffer.Bytes(), nil } // DeserializeThrift takes a byte slice and attempts to write it into the // given thrift struct using the thrift binary protocol. This is a temporary // measure before frames can forwarded directly past the endpoint to the proper // destinaiton. func DeserializeThrift(b []byte, s athrift.TStruct) error { reader := bytes.NewReader(b) transport := athrift.NewStreamTransportR(reader) return s.Read(athrift.NewTBinaryProtocolTransport(transport)) }