benchmark/internal_tcp_client.go (122 lines of code) (raw):

// Copyright (c) 2015 Uber Technologies, Inc. // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package benchmark import ( "fmt" "math/rand" "net" "time" "github.com/uber/tchannel-go" ) // internalTCPClient represents a TCP client that makes // TChannel calls using raw TCP packets. type internalTCPClient struct { host string lastID uint32 responseIDs chan uint32 conn net.Conn frames frames opts *options } func newInternalTCPClient(hosts []string, opts *options) inProcClient { return &internalTCPClient{ host: hosts[rand.Intn(len(hosts))], responseIDs: make(chan uint32, 1000), frames: getRawCallFrames(opts.timeout, opts.svcName, opts.reqSize), lastID: 1, opts: opts, } } func (c *internalTCPClient) Warmup() error { conn, err := net.Dial("tcp", c.host) if err != nil { return err } c.conn = conn go c.readConn() if err := c.frames.writeInitReq(conn); err != nil { panic(err) } return nil } func (c *internalTCPClient) readConn() { defer close(c.responseIDs) wantFirstID := true f := tchannel.NewFrame(tchannel.MaxFrameSize) for { err := f.ReadIn(c.conn) if err != nil { return } if wantFirstID { if f.Header.ID != 1 { panic(fmt.Errorf("Expected first response ID to be 1, got %v", f.Header.ID)) } wantFirstID = false continue } c.responseIDs <- f.Header.ID } } type call struct { id uint32 started time.Time numFrames int } func (c *internalTCPClient) makeCalls(latencies []time.Duration, f func() (call, error)) error { n := len(latencies) calls := make(map[uint32]*call, n) for i := 0; i < n; i++ { c, err := f() if err != nil { return err } calls[c.id] = &c } timer := time.NewTimer(c.opts.timeout) // Use the original underlying slice for latencies. durations := latencies[:0] for { if len(calls) == 0 { return nil } timer.Reset(c.opts.timeout) select { case id, ok := <-c.responseIDs: if !ok { panic("expecting more calls, but connection is closed") } call, ok := calls[id] if !ok { panic(fmt.Errorf("received unexpected response frame: %v", id)) } call.numFrames-- if call.numFrames != 0 { continue } durations = append(durations, time.Since(call.started)) delete(calls, id) case <-timer.C: return tchannel.ErrTimeout } } } func (c *internalTCPClient) RawCallBuffer(latencies []time.Duration) error { return c.makeCalls(latencies, func() (call, error) { c.lastID++ started := time.Now() numFrames, err := c.frames.writeCallReq(c.lastID, c.conn) if err != nil { return call{}, err } return call{c.lastID, started, numFrames}, nil }) } func (c *internalTCPClient) RawCall(n int) ([]time.Duration, error) { latencies := make([]time.Duration, n) return latencies, c.RawCallBuffer(latencies) } func (c *internalTCPClient) ThriftCallBuffer(latencies []time.Duration) error { panic("not yet implemented") } func (c *internalTCPClient) ThriftCall(n int) ([]time.Duration, error) { panic("not yet implemented") } func (c *internalTCPClient) Close() { c.conn.Close() }