func()

in lib/go/thrift/header_transport.go [623:734]


func (t *THeaderTransport) Flush(ctx context.Context) error {
	if t.writeBuffer == nil || t.writeBuffer.Len() == 0 {
		return nil
	}

	defer bufPool.put(&t.writeBuffer)

	switch t.clientType {
	default:
		fallthrough
	case clientUnknown:
		t.clientType = clientHeaders
		fallthrough
	case clientHeaders:
		headers := NewTMemoryBuffer()
		hp := NewTCompactProtocol(headers)
		hp.SetTConfiguration(t.cfg)
		if _, err := hp.writeVarint32(int32(t.protocolID)); err != nil {
			return NewTTransportExceptionFromError(err)
		}
		if _, err := hp.writeVarint32(int32(len(t.writeTransforms))); err != nil {
			return NewTTransportExceptionFromError(err)
		}
		for _, transform := range t.writeTransforms {
			if _, err := hp.writeVarint32(int32(transform)); err != nil {
				return NewTTransportExceptionFromError(err)
			}
		}
		if len(t.writeHeaders) > 0 {
			if _, err := hp.writeVarint32(int32(InfoKeyValue)); err != nil {
				return NewTTransportExceptionFromError(err)
			}
			if _, err := hp.writeVarint32(int32(len(t.writeHeaders))); err != nil {
				return NewTTransportExceptionFromError(err)
			}
			for key, value := range t.writeHeaders {
				if err := hp.WriteString(ctx, key); err != nil {
					return NewTTransportExceptionFromError(err)
				}
				if err := hp.WriteString(ctx, value); err != nil {
					return NewTTransportExceptionFromError(err)
				}
			}
		}
		padding := 4 - headers.Len()%4
		if padding < 4 {
			buf := t.buffer[:padding]
			for i := range buf {
				buf[i] = 0
			}
			if _, err := headers.Write(buf); err != nil {
				return NewTTransportExceptionFromError(err)
			}
		}

		payload := bufPool.get()
		defer bufPool.put(&payload)
		meta := headerMeta{
			MagicFlags:   THeaderHeaderMagic + t.Flags&THeaderFlagsMask,
			SequenceID:   t.SequenceID,
			HeaderLength: uint16(headers.Len() / 4),
		}
		if err := binary.Write(payload, binary.BigEndian, meta); err != nil {
			return NewTTransportExceptionFromError(err)
		}
		if _, err := io.Copy(payload, headers); err != nil {
			return NewTTransportExceptionFromError(err)
		}

		writer, err := NewTransformWriter(payload, t.writeTransforms)
		if err != nil {
			return NewTTransportExceptionFromError(err)
		}
		if _, err := io.Copy(writer, t.writeBuffer); err != nil {
			return NewTTransportExceptionFromError(err)
		}
		if err := writer.Close(); err != nil {
			return NewTTransportExceptionFromError(err)
		}

		// First write frame length
		buf := t.buffer[:size32]
		binary.BigEndian.PutUint32(buf, uint32(payload.Len()))
		if _, err := t.transport.Write(buf); err != nil {
			return NewTTransportExceptionFromError(err)
		}
		// Then write the payload
		if _, err := io.Copy(t.transport, payload); err != nil {
			return NewTTransportExceptionFromError(err)
		}

	case clientFramedBinary, clientFramedCompact:
		buf := t.buffer[:size32]
		binary.BigEndian.PutUint32(buf, uint32(t.writeBuffer.Len()))
		if _, err := t.transport.Write(buf); err != nil {
			return NewTTransportExceptionFromError(err)
		}
		fallthrough
	case clientUnframedBinary, clientUnframedCompact:
		if _, err := io.Copy(t.transport, t.writeBuffer); err != nil {
			return NewTTransportExceptionFromError(err)
		}
	}

	select {
	default:
	case <-ctx.Done():
		return NewTTransportExceptionFromError(ctx.Err())
	}

	return t.transport.Flush(ctx)
}