internal/helper/chunk/chunker.go (40 lines of code) (raw):

package chunk import ( "google.golang.org/protobuf/proto" ) // Sender encapsulates a gRPC response stream and the current chunk // that's being built. // // Reset, Append, [Append...], Send, Reset, Append, [Append...], Send, ... type Sender interface { // Reset should create a fresh response message. Reset() // Append should append the given item to the slice in the current response message Append(proto.Message) // Send should send the current response message Send() error } // New returns a new Chunker. func New(s Sender) *Chunker { return &Chunker{s: s} } // Chunker lets you spread items you want to send over multiple chunks. // This type is not thread-safe. type Chunker struct { s Sender size int } // maxMessageSize is maximum size per protobuf message const maxMessageSize = 1 * 1024 * 1024 // Send will append an item to the current chunk and send the chunk if it is full. func (c *Chunker) Send(it proto.Message) error { if c.size == 0 { c.s.Reset() } itSize := proto.Size(it) if itSize+c.size >= maxMessageSize { if err := c.sendResponseMsg(); err != nil { return err } c.s.Reset() } c.s.Append(it) c.size += itSize return nil } func (c *Chunker) sendResponseMsg() error { c.size = 0 return c.s.Send() } // Flush sends remaining items in the current chunk, if any. func (c *Chunker) Flush() error { if c.size == 0 { return nil } return c.sendResponseMsg() }