fragmenting_writer.go (189 lines of code) (raw):
// Copyright (c) 2015 Uber Technologies, Inc.
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package tchannel
import (
"errors"
"fmt"
"github.com/uber/tchannel-go/typed"
)
var (
errAlreadyWritingArgument = errors.New("already writing argument")
errNotWritingArgument = errors.New("not writing argument")
errComplete = errors.New("last argument already sent")
)
const (
chunkHeaderSize = 2 // each chunk is a uint16
hasMoreFragmentsFlag = 0x01 // flags indicating there are more fragments coming
)
// A writableFragment is a fragment that can be written to, containing a buffer
// for contents, a running checksum, and placeholders for the fragment flags
// and final checksum value
type writableFragment struct {
flagsRef typed.ByteRef
checksumRef typed.BytesRef
checksum Checksum
contents *typed.WriteBuffer
frame *Frame
}
// finish finishes the fragment, updating the final checksum and fragment flags
func (f *writableFragment) finish(hasMoreFragments bool) {
f.checksumRef.Update(f.checksum.Sum())
if hasMoreFragments {
// Important: hasMoreFragmentsFlag is set if there are more fragments, but NOT CLEARED if there aren't.
// This allows for callReqContinue frames to follow a fragmented callReq frame e.g. when arg2 is modified
// by the relayer
f.flagsRef.Update(hasMoreFragmentsFlag)
} else {
f.checksum.Release()
}
}
// A writableChunk is a chunk of data within a fragment, representing the
// contents of an argument within that fragment
type writableChunk struct {
size uint16
sizeRef typed.Uint16Ref
checksum Checksum
contents *typed.WriteBuffer
}
// newWritableChunk creates a new writable chunk around a checksum and a buffer to hold data
func newWritableChunk(checksum Checksum, contents *typed.WriteBuffer) *writableChunk {
return &writableChunk{
size: 0,
sizeRef: contents.DeferUint16(),
checksum: checksum,
contents: contents,
}
}
// writeAsFits writes as many bytes from the given slice as fits into the chunk
func (c *writableChunk) writeAsFits(b []byte) int {
if len(b) > c.contents.BytesRemaining() {
b = b[:c.contents.BytesRemaining()]
}
c.checksum.Add(b)
c.contents.WriteBytes(b)
written := len(b)
c.size += uint16(written)
return written
}
// finish finishes the chunk, updating its chunk size
func (c *writableChunk) finish() {
c.sizeRef.Update(c.size)
}
// A fragmentSender allocates and sends outbound fragments to a target
type fragmentSender interface {
// newFragment allocates a new fragment
newFragment(initial bool, checksum Checksum) (*writableFragment, error)
// flushFragment flushes the given fragment
flushFragment(f *writableFragment) error
// doneSending is called when the fragment receiver is finished sending all fragments.
doneSending()
}
type fragmentingWriterState int
const (
fragmentingWriteStart fragmentingWriterState = iota
fragmentingWriteInArgument
fragmentingWriteInLastArgument
fragmentingWriteWaitingForArgument
fragmentingWriteComplete
)
func (s fragmentingWriterState) isWritingArgument() bool {
return s == fragmentingWriteInArgument || s == fragmentingWriteInLastArgument
}
// A fragmentingWriter writes one or more arguments to an underlying stream,
// breaking them into fragments as needed, and applying an overarching
// checksum. It relies on an underlying fragmentSender, which creates and
// flushes the fragments as needed
type fragmentingWriter struct {
logger Logger
sender fragmentSender
checksum Checksum
curFragment *writableFragment
curChunk *writableChunk
state fragmentingWriterState
err error
}
// newFragmentingWriter creates a new fragmenting writer
func newFragmentingWriter(logger Logger, sender fragmentSender, checksum Checksum) *fragmentingWriter {
return &fragmentingWriter{
logger: logger,
sender: sender,
checksum: checksum,
state: fragmentingWriteStart,
}
}
// ArgWriter returns an ArgWriter to write an argument. The ArgWriter will handle
// fragmentation as needed. Once the argument is written, the ArgWriter must be closed.
func (w *fragmentingWriter) ArgWriter(last bool) (ArgWriter, error) {
if err := w.BeginArgument(last); err != nil {
return nil, err
}
return w, nil
}
// BeginArgument tells the writer that the caller is starting a new argument.
// Must not be called while an existing argument is in place
func (w *fragmentingWriter) BeginArgument(last bool) error {
if w.err != nil {
return w.err
}
switch {
case w.state == fragmentingWriteComplete:
w.err = errComplete
return w.err
case w.state.isWritingArgument():
w.err = errAlreadyWritingArgument
return w.err
}
// If we don't have a fragment, request one
if w.curFragment == nil {
initial := w.state == fragmentingWriteStart
if w.curFragment, w.err = w.sender.newFragment(initial, w.checksum); w.err != nil {
return w.err
}
}
// If there's no room in the current fragment, freak out. This will
// only happen due to an implementation error in the TChannel stack
// itself
if w.curFragment.contents.BytesRemaining() <= chunkHeaderSize {
panic(fmt.Errorf("attempting to begin an argument in a fragment with only %d bytes available",
w.curFragment.contents.BytesRemaining()))
}
w.curChunk = newWritableChunk(w.checksum, w.curFragment.contents)
w.state = fragmentingWriteInArgument
if last {
w.state = fragmentingWriteInLastArgument
}
return nil
}
// Write writes argument data, breaking it into fragments as needed
func (w *fragmentingWriter) Write(b []byte) (int, error) {
if w.err != nil {
return 0, w.err
}
if !w.state.isWritingArgument() {
w.err = errNotWritingArgument
return 0, w.err
}
totalWritten := 0
for {
bytesWritten := w.curChunk.writeAsFits(b)
totalWritten += bytesWritten
if bytesWritten == len(b) {
// The whole thing fit, we're done
return totalWritten, nil
}
// There was more data than fit into the fragment, so flush the current fragment,
// start a new fragment and chunk, and continue writing
if w.err = w.Flush(); w.err != nil {
return totalWritten, w.err
}
b = b[bytesWritten:]
}
}
// Flush flushes the current fragment, and starts a new fragment and chunk.
func (w *fragmentingWriter) Flush() error {
w.curChunk.finish()
w.curFragment.finish(true)
if w.err = w.sender.flushFragment(w.curFragment); w.err != nil {
return w.err
}
if w.curFragment, w.err = w.sender.newFragment(false, w.checksum); w.err != nil {
return w.err
}
w.curChunk = newWritableChunk(w.checksum, w.curFragment.contents)
return nil
}
// Close ends the current argument.
func (w *fragmentingWriter) Close() error {
last := w.state == fragmentingWriteInLastArgument
if w.err != nil {
return w.err
}
if !w.state.isWritingArgument() {
w.err = errNotWritingArgument
return w.err
}
w.curChunk.finish()
// There are three possibilities here:
// 1. There are no more arguments
// flush with more_fragments=false, mark the stream as complete
// 2. There are more arguments, but we can't fit more data into this fragment
// flush with more_fragments=true, start new fragment, write empty chunk to indicate
// the current argument is complete
// 3. There are more arguments, and we can fit more data into this fragment
// update the chunk but leave the current fragment open
if last {
// No more arguments - flush this final fragment and mark ourselves complete
w.state = fragmentingWriteComplete
w.curFragment.finish(false)
w.err = w.sender.flushFragment(w.curFragment)
w.sender.doneSending()
return w.err
}
w.state = fragmentingWriteWaitingForArgument
if w.curFragment.contents.BytesRemaining() > chunkHeaderSize {
// There's enough room in this fragment for the next argument's
// initial chunk, so we're done here
return nil
}
// This fragment is full - flush and prepare for another argument
w.curFragment.finish(true)
if w.err = w.sender.flushFragment(w.curFragment); w.err != nil {
return w.err
}
if w.curFragment, w.err = w.sender.newFragment(false, w.checksum); w.err != nil {
return w.err
}
// Write an empty chunk to indicate this argument has ended
w.curFragment.contents.WriteUint16(0)
return nil
}