func()

in protocol/triple/triple_protocol/envelope.go [149:213]


func (r *envelopeReader) Unmarshal(message any) *Error {
	buffer := r.bufferPool.Get()
	defer r.bufferPool.Put(buffer)

	env := &envelope{Data: buffer}
	err := r.Read(env)
	switch {
	case err == nil &&
		(env.Flags == 0 || env.Flags == flagEnvelopeCompressed) &&
		env.Data.Len() == 0:
		// This is a standard message (because none of the top 7 bits are set) and
		// there's no data, so the zero value of the message is correct.
		return nil
	case err != nil && errors.Is(err, io.EOF):
		// The stream has ended. Propagate the EOF to the caller.
		return err
	case err != nil:
		// Something's wrong.
		return err
	}

	data := env.Data
	if data.Len() > 0 && env.IsSet(flagEnvelopeCompressed) {
		if r.compressionPool == nil {
			return errorf(
				CodeInvalidArgument,
				"gRPC protocol error: sent compressed message without Grpc-Encoding header",
			)
		}
		decompressed := r.bufferPool.Get()
		defer r.bufferPool.Put(decompressed)
		if err := r.compressionPool.Decompress(decompressed, data, int64(r.readMaxBytes)); err != nil {
			return err
		}
		data = decompressed
	}

	if env.Flags != 0 && env.Flags != flagEnvelopeCompressed {
		// One of the protocol-specific flags are set, so this is the end of the
		// stream. Save the message for protocol-specific code to process and
		// return a sentinel error. Since we've deferred functions to return env's
		// underlying buffer to a pool, we need to keep a copy.
		r.last = envelope{
			Data:  r.bufferPool.Get(),
			Flags: env.Flags,
		}
		// Don't return last to the pool! We're going to reference the data
		// elsewhere.
		if _, err := r.last.Data.ReadFrom(data); err != nil {
			return errorf(CodeUnknown, "copy final envelope: %w", err)
		}
		return errSpecialEnvelope
	}

	if err := r.codec.Unmarshal(data.Bytes(), message); err != nil {
		if r.backupCodec != nil && r.backupCodec.Name() != r.codec.Name() {
			logger.Debugf("failed to unmarshal message with codec %s, trying alternative codec %s", r.codec.Name(), r.backupCodec.Name())
			err = r.backupCodec.Unmarshal(data.Bytes(), message)
		}
		if err != nil {
			return errorf(CodeInvalidArgument, "unmarshal into %T: %w", message, err)
		}
	}
	return nil
}