internal/pkg/file/cbor/chunk.go (157 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. package cbor import ( "encoding/binary" "errors" "io" "time" ) // This is a trimmed-down special purpose writer // and cbor encoder used to streamline upload chunk writing // without buffering large amounts of data // in memory. // It is not a general-purpose CBOR encoder. // A suitable general purpose library, if the future needs one, is github.com/fxamacker/cbor/v2 type ChunkEncoder struct { chunk io.Reader final bool preamble []byte prbWritten bool prbWritePos int wroteTerm bool } func NewChunkWriter(chunkData io.Reader, finalChunk bool, baseID string, chunkHash string, chunkSize int64) *ChunkEncoder { return &ChunkEncoder{ chunk: chunkData, final: finalChunk, preamble: encodePreambleToCBOR(finalChunk, baseID, chunkHash, chunkSize), prbWritten: false, prbWritePos: 0, wroteTerm: false, } } // Writes the start of a CBOR object (equiv. JSON object) // // { // "@timestamp": 1687891588, // "last": true/false, // "bid": "baseID", // "sha2": "...", // "data": // } // // the slice ends where the chunk data bytes ("byte string") should begin. // it is therefore an incomplete CBOR object on its own // expecting the next section to be filled in by the caller. // the CBOR spec may be found here: https://www.rfc-editor.org/rfc/rfc8949 // chunksize is ignored when writing the "final"=true chunk func encodePreambleToCBOR(final bool, baseID string, chunkHash string, chunkSize int64) []byte { bidLen := len(baseID) hashLen := len(chunkHash) // if we know the size of the chunk stream, we will write the 4-byte uint32 // descriptor of that length // otherwise it will be a *single* byte saying it is an unknown length // and we will write out lengths as the chunk is read chunkLen := 5 // space for describing sequence length. 1 byte to SAY 32-bit int (4byte), then 4 bytes if final { chunkLen = 1 } preamble := make([]byte, 31+bidLen+2+5+hashLen+2+chunkLen+5) preamble[0] = 0xA5 // Object with 5 keys preamble[1] = 0x6a // string with 10 characters (@timestamp) preamble[2] = '@' preamble[3] = 't' preamble[4] = 'i' preamble[5] = 'm' preamble[6] = 'e' preamble[7] = 's' preamble[8] = 't' preamble[9] = 'a' preamble[10] = 'm' preamble[11] = 'p' preamble[12] = 0x1b // uint64 to follow // occupies 8 bytes, indexes 13-20 binary.BigEndian.PutUint64(preamble[13:], uint64(time.Now().UnixMilli())) //nolint:gosec // disable G115 preamble[21] = 0x64 // string with 4 chars (key: last) preamble[22] = 'l' preamble[23] = 'a' preamble[24] = 's' preamble[25] = 't' if final { preamble[26] = 0xF5 // bool true } else { preamble[26] = 0xF4 // bool false } preamble[27] = 0x63 // string with 3 chars (key: bid) preamble[28] = 'b' preamble[29] = 'i' preamble[30] = 'd' i := 31 if n, err := writeString(preamble[i:], baseID); err != nil { return nil } else { i += n } if n, err := writeKey(preamble[i:], "sha2"); err != nil { return nil } else { i += n } if n, err := writeString(preamble[i:], chunkHash); err != nil { return nil } else { i += n } if n, err := writeKey(preamble[i:], "data"); err != nil { return nil } else { i += n } if !final { // byte data should be precisely chunkSize long, otherwise malformed preamble[i] = 0x5A // say length descriptor will be 32-bit int binary.BigEndian.PutUint32(preamble[i+1:], uint32(chunkSize)) //nolint:gosec // disable G115 } else { // final chunk may be less than full size, will need to determine length preamble[i] = 0x5F // indeterminate-length byte sequence } return preamble } const varLenHeaderSize = 5 // io.Reader interface for streaming out func (c *ChunkEncoder) Read(buf []byte) (int, error) { if c.wroteTerm { // already wrote a terminating instruction for undefined byte sequence length return 0, io.EOF } if !c.prbWritten { n := copy(buf, c.preamble[c.prbWritePos:]) if n == len(c.preamble[c.prbWritePos:]) { c.prbWritten = true } c.prbWritePos += n return n, nil } if c.final { // need to write length headers before the byte sequence if len(buf) < 10 { return 0, errors.New("buffer too small") } n, err := c.chunk.Read(buf[varLenHeaderSize:]) buf[0] = 0x5A // 4-byte length descriptor to follow binary.BigEndian.PutUint32(buf[1:], uint32(n)) //nolint:gosec // disable G115 if errors.Is(err, io.EOF) { if n == 0 { // chunk data has been exhausted, write the terminating byte and get out buf[0] = 0xFF c.wroteTerm = true return 1, io.EOF } // if we can tack-on the terminating byte from this read call, do it if len(buf) > n+varLenHeaderSize+1 { buf[n+varLenHeaderSize] = 0xFF c.wroteTerm = true n = n + 1 } else { //otherwise, wait for the next call to Read(), hide the EOF err err = nil } } return n + varLenHeaderSize, err } return c.chunk.Read(buf) } // writes len(key)+1 bytes func writeKey(buf []byte, key string) (int, error) { keylen := len(key) if keylen > 0x17 { // CBOR spec max size for single-byte string length descriptor // another method would have to be used for writing the string length return 0, errors.New("large key size, write manually") } if len(buf) < keylen+1 { return 0, errors.New("cbor buffer size too small") } buf[0] = byte(0x60 + keylen) for i, c := range key { buf[i+1] = byte(c) } return keylen + 1, nil } // writes len(string)+2 bytes func writeString(buf []byte, val string) (int, error) { strlen := len(val) if strlen > 0xff { // max single-byte strlen return 0, errors.New("oversize string") } if len(buf) < strlen+2 { return 0, errors.New("cbor buffer size too small") } buf[0] = 0x78 // Descriptor for: "UTF8 string. Next byte is a uint8 for n, and then n bytes follow" buf[1] = uint8(strlen) for i, c := range val { buf[i+2] = byte(c) } return strlen + 2, nil }