in protocol/triple/triple_protocol/envelope.go [215:288]
func (r *envelopeReader) Read(env *envelope) *Error {
// Read prefix firstly, then read the packet with length specified by length field
prefixes := [5]byte{}
prefixBytesRead, err := r.reader.Read(prefixes[:])
switch {
case (err == nil || errors.Is(err, io.EOF)) &&
prefixBytesRead == 5 &&
isSizeZeroPrefix(prefixes):
// Successfully read prefix and expect no additional data.
env.Flags = prefixes[0]
return nil
case err != nil && errors.Is(err, io.EOF) && prefixBytesRead == 0:
// The stream ended cleanly. That's expected, but we need to propagate them
// to the user so that they know that the stream has ended. We shouldn't
// add any alarming text about protocol errors, though.
return NewError(CodeUnknown, err)
case err != nil || prefixBytesRead < 5:
// Something else has gone wrong - the stream didn't end cleanly.
if tripleErr, ok := asError(err); ok {
return tripleErr
}
if maxBytesErr := asMaxBytesError(err, "read 5 byte message prefix"); maxBytesErr != nil {
// We're reading from an http.MaxBytesHandler, and we've exceeded the read limit.
return maxBytesErr
}
return errorf(
CodeInvalidArgument,
"protocol error: incomplete envelope: %w", err,
)
}
size := int(binary.BigEndian.Uint32(prefixes[1:5]))
if size < 0 {
return errorf(CodeInvalidArgument, "message size %d overflowed uint32", size)
}
if r.readMaxBytes > 0 && size > r.readMaxBytes {
_, err := io.CopyN(io.Discard, r.reader, int64(size))
if err != nil && !errors.Is(err, io.EOF) {
return errorf(CodeUnknown, "read enveloped message: %w", err)
}
return errorf(CodeResourceExhausted, "message size %d is larger than configured max %d", size, r.readMaxBytes)
}
if size > 0 {
env.Data.Grow(size)
// At layer 7, we don't know exactly what's happening down in L4. Large
// length-prefixed messages may arrive in chunks, so we may need to read
// the request body past EOF. We also need to take care that we don't retry
// forever if the message is malformed.
remaining := int64(size)
for remaining > 0 {
bytesRead, err := io.CopyN(env.Data, r.reader, remaining)
if err != nil && !errors.Is(err, io.EOF) {
if maxBytesErr := asMaxBytesError(err, "read %d byte message", size); maxBytesErr != nil {
// We're reading from an http.MaxBytesHandler, and we've exceeded the read limit.
return maxBytesErr
}
return errorf(CodeUnknown, "read enveloped message: %w", err)
}
if errors.Is(err, io.EOF) && bytesRead == 0 {
// We've gotten zero-length chunk of data. Message is likely malformed,
// don't wait for additional chunks.
return errorf(
CodeInvalidArgument,
"protocol error: promised %d bytes in enveloped message, got %d bytes",
size,
int64(size)-remaining,
)
}
remaining -= bytesRead
}
}
env.Flags = prefixes[0]
return nil
}