utils/stream_serialization.go (170 lines of code) (raw):
// Copyright (c) 2017-2018 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package utils
import (
"io"
"unsafe"
)
// StreamDataReader reads primitive Golang data types from an underlying reader.
// It always advance the read iterator without rewinding it.
type StreamDataReader struct {
reader io.Reader
bytesRead uint32
}
// NewStreamDataReader returns a StreamDataReader given an underlying reader.
func NewStreamDataReader(reader io.Reader) StreamDataReader {
return StreamDataReader{
reader: reader,
}
}
// GetBytesRead returns number of bytes read so far.
func (r StreamDataReader) GetBytesRead() uint32 {
return r.bytesRead
}
// Read reads bytes from underlying reader and fill bs with len(bs) bytes data. Raise
// error if there is no enough bytes left.
func (r *StreamDataReader) Read(bs []byte) error {
var err error
var bytesRead int
bytesToRead := len(bs)
for err == nil && bytesRead < bytesToRead {
var readLen int
readLen, err = r.reader.Read(bs[bytesRead:])
// Implementations of Read are discouraged from returning a
// zero byte count with a nil error, except when len(p) == 0.
// Callers should treat a return of 0 and nil as indicating that
// nothing happened; in particular it does not indicate EOF.
if readLen <= 0 {
break
}
bytesRead += readLen
}
// We return EOF directly without wrapping so that callers can take special actions against EOF.
if err == io.EOF {
return err
}
if err != nil {
return StackError(err, "Failed to Read data from underlying reader")
}
// We have to check readLen here since a non-zero number of bytes at the end of the input stream
// may return either err == EOF or err == nil.
if bytesRead != bytesToRead {
return StackError(nil,
"Tried to read %d bytes but only %d bytes left", bytesToRead, bytesRead)
}
r.bytesRead += uint32(bytesToRead)
return nil
}
// ReadUint8 reads one uint8 from the reader and advance.
func (r *StreamDataReader) ReadUint8() (uint8, error) {
b := [1]byte{}
if err := r.Read(b[:]); err != nil {
return 0, err
}
return *(*uint8)(unsafe.Pointer(&b)), nil
}
// ReadInt8 reads one int8 from the reader and advance.
func (r *StreamDataReader) ReadInt8() (int8, error) {
b := [1]byte{}
if err := r.Read(b[:]); err != nil {
return 0, err
}
return *(*int8)(unsafe.Pointer(&b)), nil
}
// ReadUint16 reads one uint16 from the reader and advance.
func (r *StreamDataReader) ReadUint16() (uint16, error) {
b := [2]byte{}
if err := r.Read(b[:]); err != nil {
return 0, err
}
return *(*uint16)(unsafe.Pointer(&b)), nil
}
// ReadInt16 reads one int16 from the reader and advance.
func (r *StreamDataReader) ReadInt16() (int16, error) {
b := [2]byte{}
if err := r.Read(b[:]); err != nil {
return 0, err
}
return *(*int16)(unsafe.Pointer(&b)), nil
}
// ReadUint32 reads one uint32 from the reader and advance.
func (r *StreamDataReader) ReadUint32() (uint32, error) {
b := [4]byte{}
if err := r.Read(b[:]); err != nil {
return 0, err
}
return *(*uint32)(unsafe.Pointer(&b)), nil
}
// ReadInt32 reads one int32 from the reader and advance.
func (r *StreamDataReader) ReadInt32() (int32, error) {
b := [4]byte{}
if err := r.Read(b[:]); err != nil {
return 0, err
}
return *(*int32)(unsafe.Pointer(&b)), nil
}
// ReadUint64 reads one uint64 from the reader and advance.
func (r *StreamDataReader) ReadUint64() (uint64, error) {
b := [8]byte{}
if err := r.Read(b[:]); err != nil {
return 0, err
}
return *(*uint64)(unsafe.Pointer(&b)), nil
}
// ReadFloat32 reads one float32 from the reader and advance.
func (r *StreamDataReader) ReadFloat32() (float32, error) {
b := [4]byte{}
if err := r.Read(b[:]); err != nil {
return 0, err
}
return *(*float32)(unsafe.Pointer(&b)), nil
}
// SkipBytes read some empty bytes from the reader and discard it
func (r *StreamDataReader) SkipBytes(nbytes int) error {
return r.Read(make([]byte, nbytes))
}
// ReadPadding reads and ignore bytes until alignment is met.
// nbytes is the size of bytes already read.
func (r *StreamDataReader) ReadPadding(nbytes int, alignment int) error {
if alignment > 1 {
remainder := nbytes % alignment
if remainder > 0 {
return r.SkipBytes(alignment - remainder)
}
}
return nil
}
// StreamDataWriter writes primitive Golang data types to an underlying writer.
// It always advance the write iterator without rewinding it.
type StreamDataWriter struct {
writer io.Writer
bytesWritten uint32
}
// NewStreamDataWriter returns a StreamDataWriter given an underlying writer.
func NewStreamDataWriter(writer io.Writer) StreamDataWriter {
return StreamDataWriter{
writer: writer,
}
}
// GetBytesWritten returns number of bytes written by current writer so far.
func (w StreamDataWriter) GetBytesWritten() uint32 {
return w.bytesWritten
}
func (w *StreamDataWriter) Write(bs []byte) error {
// We can ignore n here since Write must return a non-nil error if it returns n < len(p).
n, err := w.writer.Write(bs)
w.bytesWritten += uint32(n)
return err
}
// WriteUint8 writes one uint8 to the writer and advance.
func (w *StreamDataWriter) WriteUint8(v uint8) error {
var b [1]byte
b[0] = v
return w.Write(b[:])
}
// WriteInt8 writes one int8 to the writer and advance.
func (w *StreamDataWriter) WriteInt8(v int8) error {
var b [1]byte
b[0] = byte(v)
return w.Write(b[:])
}
// WriteUint16 writes one uint16 to the writer and advance.
func (w *StreamDataWriter) WriteUint16(v uint16) error {
return w.Write((*(*[2]byte)(unsafe.Pointer(&v)))[:])
}
// WriteInt16 writes one int16 to the writer and advance.
func (w *StreamDataWriter) WriteInt16(v int16) error {
return w.Write((*(*[2]byte)(unsafe.Pointer(&v)))[:])
}
// WriteUint32 writes one uint32 to the writer and advance.
func (w *StreamDataWriter) WriteUint32(v uint32) error {
return w.Write((*(*[4]byte)(unsafe.Pointer(&v)))[:])
}
// WriteInt32 writes one int32 to the writer and advance.
func (w *StreamDataWriter) WriteInt32(v int32) error {
return w.Write((*(*[4]byte)(unsafe.Pointer(&v)))[:])
}
// WriteUint64 writes one uint64 to the writer and advance.
func (w *StreamDataWriter) WriteUint64(v uint64) error {
return w.Write((*(*[8]byte)(unsafe.Pointer(&v)))[:])
}
// WriteFloat32 writes one float32 to the writer and advance.
func (w *StreamDataWriter) WriteFloat32(v float32) error {
return w.Write((*(*[4]byte)(unsafe.Pointer(&v)))[:])
}
// SkipBytes write some empty bytes to the writer and advance.
func (w *StreamDataWriter) SkipBytes(nbytes int) error {
if nbytes > 0 {
return w.Write(make([]byte, nbytes))
}
return nil
}
// WritePadding write some empty bytes until alignment is met.
// nbytes is the size of bytes already written.
func (w *StreamDataWriter) WritePadding(nbytes int, alignment int) error {
if alignment > 1 {
remainder := nbytes % alignment
if remainder > 0 {
return w.SkipBytes(alignment - remainder)
}
}
return nil
}