in protocol/triple/triple_protocol/envelope.go [149:213]
func (r *envelopeReader) Unmarshal(message any) *Error {
buffer := r.bufferPool.Get()
defer r.bufferPool.Put(buffer)
env := &envelope{Data: buffer}
err := r.Read(env)
switch {
case err == nil &&
(env.Flags == 0 || env.Flags == flagEnvelopeCompressed) &&
env.Data.Len() == 0:
// This is a standard message (because none of the top 7 bits are set) and
// there's no data, so the zero value of the message is correct.
return nil
case err != nil && errors.Is(err, io.EOF):
// The stream has ended. Propagate the EOF to the caller.
return err
case err != nil:
// Something's wrong.
return err
}
data := env.Data
if data.Len() > 0 && env.IsSet(flagEnvelopeCompressed) {
if r.compressionPool == nil {
return errorf(
CodeInvalidArgument,
"gRPC protocol error: sent compressed message without Grpc-Encoding header",
)
}
decompressed := r.bufferPool.Get()
defer r.bufferPool.Put(decompressed)
if err := r.compressionPool.Decompress(decompressed, data, int64(r.readMaxBytes)); err != nil {
return err
}
data = decompressed
}
if env.Flags != 0 && env.Flags != flagEnvelopeCompressed {
// One of the protocol-specific flags are set, so this is the end of the
// stream. Save the message for protocol-specific code to process and
// return a sentinel error. Since we've deferred functions to return env's
// underlying buffer to a pool, we need to keep a copy.
r.last = envelope{
Data: r.bufferPool.Get(),
Flags: env.Flags,
}
// Don't return last to the pool! We're going to reference the data
// elsewhere.
if _, err := r.last.Data.ReadFrom(data); err != nil {
return errorf(CodeUnknown, "copy final envelope: %w", err)
}
return errSpecialEnvelope
}
if err := r.codec.Unmarshal(data.Bytes(), message); err != nil {
if r.backupCodec != nil && r.backupCodec.Name() != r.codec.Name() {
logger.Debugf("failed to unmarshal message with codec %s, trying alternative codec %s", r.codec.Name(), r.backupCodec.Name())
err = r.backupCodec.Unmarshal(data.Bytes(), message)
}
if err != nil {
return errorf(CodeInvalidArgument, "unmarshal into %T: %w", message, err)
}
}
return nil
}