relay_messages.go (221 lines of code) (raw):

// Copyright (c) 2015 Uber Technologies, Inc. // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. package tchannel import ( "bytes" "encoding/binary" "fmt" "time" "github.com/uber/tchannel-go/relay" "github.com/uber/tchannel-go/thrift/arg2" "github.com/uber/tchannel-go/typed" ) var _ relay.RespFrame = (*lazyCallRes)(nil) var ( _callerNameKeyBytes = []byte(CallerName) _routingDelegateKeyBytes = []byte(RoutingDelegate) _routingKeyKeyBytes = []byte(RoutingKey) _argSchemeKeyBytes = []byte(ArgScheme) _tchanThriftValueBytes = []byte(Thrift) ) const ( // Common to many frame types. _flagsIndex = 0 // For call req, indexes into the frame. // Use int for indexes to avoid overflow caused by accidental byte arithmentic. _ttlIndex int = 1 _ttlLen int = 4 _spanIndex int = _ttlIndex + _ttlLen _spanLength int = 25 _serviceLenIndex int = _spanIndex + _spanLength _serviceNameIndex int = _serviceLenIndex + 1 // For call res and call res continue. _resCodeOK = 0x00 _resCodeIndex int = 1 // For error. _errCodeIndex int = 0 ) type lazyError struct { *Frame } func newLazyError(f *Frame) lazyError { if msgType := f.Header.messageType; msgType != messageTypeError { panic(fmt.Errorf("newLazyError called for wrong messageType: %v", msgType)) } return lazyError{f} } func (e lazyError) Code() SystemErrCode { return SystemErrCode(e.Payload[_errCodeIndex]) } type lazyCallRes struct { *Frame as []byte arg2IsFragmented bool arg2Payload []byte } func newLazyCallRes(f *Frame) (lazyCallRes, error) { if msgType := f.Header.messageType; msgType != messageTypeCallRes { panic(fmt.Errorf("newLazyCallRes called for wrong messageType: %v", msgType)) } rbuf := typed.NewReadBuffer(f.SizedPayload()) rbuf.SkipBytes(1) // flags rbuf.SkipBytes(1) // code rbuf.SkipBytes(_spanLength) // tracing var as []byte nh := int(rbuf.ReadSingleByte()) for i := 0; i < nh; i++ { keyLen := int(rbuf.ReadSingleByte()) key := rbuf.ReadBytes(keyLen) valLen := int(rbuf.ReadSingleByte()) val := rbuf.ReadBytes(valLen) if bytes.Equal(key, _argSchemeKeyBytes) { as = val continue } } csumtype := ChecksumType(rbuf.ReadSingleByte()) // csumtype rbuf.SkipBytes(csumtype.ChecksumSize()) // csum // arg1: ignored narg1 := int(rbuf.ReadUint16()) rbuf.SkipBytes(narg1) // arg2: keep track of payload narg2 := int(rbuf.ReadUint16()) arg2Payload := rbuf.ReadBytes(narg2) arg2IsFragmented := rbuf.BytesRemaining() == 0 && hasMoreFragments(f) // arg3: ignored // Make sure we didn't hit any issues reading the buffer if err := rbuf.Err(); err != nil { return lazyCallRes{}, fmt.Errorf("read response frame: %v", err) } return lazyCallRes{ Frame: f, as: as, arg2IsFragmented: arg2IsFragmented, arg2Payload: arg2Payload, }, nil } // OK implements relay.RespFrame func (cr lazyCallRes) OK() bool { return isCallResOK(cr.Frame) } // ArgScheme implements relay.RespFrame func (cr lazyCallRes) ArgScheme() []byte { return cr.as } // Arg2IsFragmented implements relay.RespFrame func (cr lazyCallRes) Arg2IsFragmented() bool { return cr.arg2IsFragmented } // Arg2 implements relay.RespFrame func (cr lazyCallRes) Arg2() []byte { return cr.arg2Payload } type lazyCallReq struct { *Frame checksumTypeOffset uint16 arg2StartOffset, arg2EndOffset uint16 arg3StartOffset uint16 caller, method, delegate, key, as []byte arg2Appends []relay.KeyVal checksumType ChecksumType isArg2Fragmented bool // Intentionally an array to combine allocations with that of lazyCallReq arg2InitialBuf [1]relay.KeyVal } // TODO: Consider pooling lazyCallReq and using pointers to the struct. func newLazyCallReq(f *Frame) (*lazyCallReq, error) { if msgType := f.Header.messageType; msgType != messageTypeCallReq { panic(fmt.Errorf("newLazyCallReq called for wrong messageType: %v", msgType)) } cr := &lazyCallReq{Frame: f} cr.arg2Appends = cr.arg2InitialBuf[:0] rbuf := typed.NewReadBuffer(f.SizedPayload()) rbuf.SkipBytes(_serviceLenIndex) // service~1 serviceLen := rbuf.ReadSingleByte() rbuf.SkipBytes(int(serviceLen)) // nh:1 (hk~1 hv~1){nh} numHeaders := int(rbuf.ReadSingleByte()) for i := 0; i < numHeaders; i++ { keyLen := int(rbuf.ReadSingleByte()) key := rbuf.ReadBytes(keyLen) valLen := int(rbuf.ReadSingleByte()) val := rbuf.ReadBytes(valLen) if bytes.Equal(key, _argSchemeKeyBytes) { cr.as = val } else if bytes.Equal(key, _callerNameKeyBytes) { cr.caller = val } else if bytes.Equal(key, _routingDelegateKeyBytes) { cr.delegate = val } else if bytes.Equal(key, _routingKeyKeyBytes) { cr.key = val } } // csumtype:1 (csum:4){0,1} arg1~2 arg2~2 arg3~2 cr.checksumTypeOffset = uint16(rbuf.BytesRead()) cr.checksumType = ChecksumType(rbuf.ReadSingleByte()) rbuf.SkipBytes(cr.checksumType.ChecksumSize()) // arg1~2 arg1Len := int(rbuf.ReadUint16()) cr.method = rbuf.ReadBytes(arg1Len) // arg2~2 arg2Len := rbuf.ReadUint16() cr.arg2StartOffset = uint16(rbuf.BytesRead()) cr.arg2EndOffset = cr.arg2StartOffset + arg2Len // arg2 is fragmented if we don't see arg3 in this frame. rbuf.SkipBytes(int(arg2Len)) cr.isArg2Fragmented = rbuf.BytesRemaining() == 0 && cr.HasMoreFragments() if !cr.isArg2Fragmented { // arg3~2 rbuf.SkipBytes(2) cr.arg3StartOffset = uint16(rbuf.BytesRead()) } if rbuf.Err() != nil { return nil, rbuf.Err() } return cr, nil } // Caller returns the name of the originator of this callReq. func (f *lazyCallReq) Caller() []byte { return f.caller } // Service returns the name of the destination service for this callReq. func (f *lazyCallReq) Service() []byte { l := f.Payload[_serviceLenIndex] return f.Payload[_serviceNameIndex : _serviceNameIndex+int(l)] } // Method returns the name of the method being called. func (f *lazyCallReq) Method() []byte { return f.method } // RoutingDelegate returns the routing delegate for this call req, if any. func (f *lazyCallReq) RoutingDelegate() []byte { return f.delegate } // RoutingKey returns the routing delegate for this call req, if any. func (f *lazyCallReq) RoutingKey() []byte { return f.key } // TTL returns the time to live for this callReq. func (f *lazyCallReq) TTL() time.Duration { ttl := binary.BigEndian.Uint32(f.Payload[_ttlIndex : _ttlIndex+_ttlLen]) return time.Duration(ttl) * time.Millisecond } // SetTTL overwrites the frame's TTL. func (f *lazyCallReq) SetTTL(d time.Duration) { ttl := uint32(d / time.Millisecond) binary.BigEndian.PutUint32(f.Payload[_ttlIndex:_ttlIndex+_ttlLen], ttl) } // Span returns the Span func (f *lazyCallReq) Span() Span { return callReqSpan(f.Frame) } // HasMoreFragments returns whether the callReq has more fragments. func (f *lazyCallReq) HasMoreFragments() bool { return f.Payload[_flagsIndex]&hasMoreFragmentsFlag != 0 } // Arg2EndOffset returns the offset from start of payload to the end of Arg2 // in bytes, and hasMore to be true if there are more frames and arg3 has // not started. func (f *lazyCallReq) Arg2EndOffset() (_ int, hasMore bool) { return int(f.arg2EndOffset), f.isArg2Fragmented } // Arg2StartOffset returns the offset from start of payload to the beginning // of Arg2 in bytes. func (f *lazyCallReq) Arg2StartOffset() int { return int(f.arg2StartOffset) } func (f *lazyCallReq) arg2() []byte { return f.Payload[f.arg2StartOffset:f.arg2EndOffset] } func (f *lazyCallReq) arg3() []byte { return f.SizedPayload()[f.arg3StartOffset:] } // Arg2Iterator returns the iterator for reading Arg2 key value pair // of TChannel-Thrift Arg Scheme. func (f *lazyCallReq) Arg2Iterator() (arg2.KeyValIterator, error) { if !bytes.Equal(f.as, _tchanThriftValueBytes) { return arg2.KeyValIterator{}, fmt.Errorf("%v: got %s", errArg2ThriftOnly, f.as) } return arg2.NewKeyValIterator(f.Payload[f.arg2StartOffset:f.arg2EndOffset]) } func (f *lazyCallReq) Arg2Append(key, val []byte) { f.arg2Appends = append(f.arg2Appends, relay.KeyVal{Key: key, Val: val}) } // finishesCall checks whether this frame is the last one we should expect for // this RPC req-res. func finishesCall(f *Frame) bool { switch f.messageType() { case messageTypeError, messageTypeCancel: return true case messageTypeCallRes, messageTypeCallResContinue: flags := f.Payload[_flagsIndex] return flags&hasMoreFragmentsFlag == 0 default: return false } } // isCallResOK indicates whether the call was successful func isCallResOK(f *Frame) bool { return f.Payload[_resCodeIndex] == _resCodeOK } // hasMoreFragments indicates whether there are more fragments following this frame func hasMoreFragments(f *Frame) bool { return f.Payload[_flagsIndex]&hasMoreFragmentsFlag != 0 }