benchmark/internal_client.go (130 lines of code) (raw):

// Copyright (c) 2015 Uber Technologies, Inc. // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package benchmark import ( "bytes" "fmt" "os" "time" "github.com/uber/tchannel-go" "github.com/uber/tchannel-go/raw" "github.com/uber/tchannel-go/thrift" gen "github.com/uber/tchannel-go/thrift/gen-go/test" ) // internalClient represents a benchmark client. type internalClient struct { ch *tchannel.Channel sc *tchannel.SubChannel tClient gen.TChanSecondService argStr string argBytes []byte checkResult bool opts *options } // NewClient returns a new Client that can make calls to a benchmark server. func NewClient(hosts []string, optFns ...Option) Client { opts := getOptions(optFns) if opts.external { return newExternalClient(hosts, opts) } if opts.numClients > 1 { return newInternalMultiClient(hosts, opts) } return newClient(hosts, opts) } func newClient(hosts []string, opts *options) inProcClient { if opts.external || opts.numClients > 1 { panic("newClient got options that should be handled by NewClient") } if opts.noLibrary { return newInternalTCPClient(hosts, opts) } return newInternalClient(hosts, opts) } func newInternalClient(hosts []string, opts *options) inProcClient { ch, err := tchannel.NewChannel(opts.svcName, &tchannel.ChannelOptions{ Logger: tchannel.NewLevelLogger(tchannel.NewLogger(os.Stderr), tchannel.LogLevelWarn), }) if err != nil { panic("failed to create channel: " + err.Error()) } for _, host := range hosts { ch.Peers().Add(host) } thriftClient := thrift.NewClient(ch, opts.svcName, nil) client := gen.NewTChanSecondServiceClient(thriftClient) return &internalClient{ ch: ch, sc: ch.GetSubChannel(opts.svcName), tClient: client, argBytes: getRequestBytes(opts.reqSize), argStr: getRequestString(opts.reqSize), opts: opts, } } func (c *internalClient) Warmup() error { for _, peer := range c.ch.Peers().Copy() { ctx, cancel := tchannel.NewContext(c.opts.timeout) _, err := peer.GetConnection(ctx) cancel() if err != nil { return err } } return nil } func (c *internalClient) makeCalls(latencies []time.Duration, f func() (time.Duration, error)) error { for i := range latencies { var err error latencies[i], err = f() if err != nil { return err } } return nil } func (c *internalClient) RawCallBuffer(latencies []time.Duration) error { return c.makeCalls(latencies, func() (time.Duration, error) { ctx, cancel := tchannel.NewContext(c.opts.timeout) defer cancel() started := time.Now() rArg2, rArg3, _, err := raw.CallSC(ctx, c.sc, "echo", c.argBytes, c.argBytes) duration := time.Since(started) if err != nil { return 0, err } if c.checkResult { if !bytes.Equal(rArg2, c.argBytes) || !bytes.Equal(rArg3, c.argBytes) { fmt.Println("Arg2", rArg2, "Expect", c.argBytes) fmt.Println("Arg3", rArg3, "Expect", c.argBytes) panic("echo call returned wrong results") } } return duration, nil }) } func (c *internalClient) RawCall(n int) ([]time.Duration, error) { latencies := make([]time.Duration, n) return latencies, c.RawCallBuffer(latencies) } func (c *internalClient) ThriftCallBuffer(latencies []time.Duration) error { return c.makeCalls(latencies, func() (time.Duration, error) { ctx, cancel := thrift.NewContext(c.opts.timeout) defer cancel() started := time.Now() res, err := c.tClient.Echo(ctx, c.argStr) duration := time.Since(started) if err != nil { return 0, err } if c.checkResult { if res != c.argStr { panic("thrift Echo returned wrong result") } } return duration, nil }) } func (c *internalClient) ThriftCall(n int) ([]time.Duration, error) { latencies := make([]time.Duration, n) return latencies, c.ThriftCallBuffer(latencies) } func (c *internalClient) Close() { c.ch.Close() }