processor/lsmintervalprocessor/internal/merger/key.go (80 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package merger // import "github.com/elastic/opentelemetry-collector-components/processor/lsmintervalprocessor/internal/merger" import ( "encoding/binary" "errors" "fmt" "slices" "time" ) type Key struct { Interval time.Duration ProcessingTime time.Time // Metadata holds an ordered list of arbitrary keys and associated // string values to associate with the interval and processing time. Metadata []KeyValues } type KeyValues struct { Key string Values []string } // AppendBinary marshals the key into its binary representation, // appending it to b. func (k *Key) AppendBinary(b []byte) ([]byte, error) { b = slices.Grow(b, 10) b = binary.BigEndian.AppendUint16(b, uint16(k.Interval.Seconds())) b = binary.BigEndian.AppendUint64(b, uint64(k.ProcessingTime.Unix())) if len(k.Metadata) != 0 { b = binary.AppendUvarint(b, uint64(len(k.Metadata))) for _, kvs := range k.Metadata { mk := kvs.Key mvs := kvs.Values b = binary.AppendUvarint(b, uint64(len(mk))) b = append(b, mk...) b = binary.AppendUvarint(b, uint64(len(mvs))) for _, mv := range mvs { b = binary.AppendUvarint(b, uint64(len(mv))) b = append(b, mv...) } } } return b, nil } // Unmarshal unmarshals the binary representation of the Key. func (k *Key) Unmarshal(d []byte) error { if len(d) < 10 { return errors.New("failed to unmarshal key, invalid sized buffer provided") } k.Interval = time.Duration(binary.BigEndian.Uint16(d[:2])) * time.Second k.ProcessingTime = time.Unix(int64(binary.BigEndian.Uint64(d[2:10])), 0) d = d[10:] if len(d) > 0 { numKeys, n := binary.Uvarint(d) if n <= 0 { return fmt.Errorf("error reading number of metadata keys (n=%d)", n) } d = d[n:] k.Metadata = make([]KeyValues, numKeys) for i := range numKeys { mklen, n := binary.Uvarint(d) if n <= 0 { return fmt.Errorf("error reading metadata key length (n=%d)", n) } d = d[n:] mk := string(d[:mklen]) d = d[mklen:] numValues, n := binary.Uvarint(d) if n <= 0 { return fmt.Errorf("error reading number of metadata values for %q (n=%d)", mk, n) } d = d[n:] mvs := make([]string, numValues) for i := range numValues { mvlen, n := binary.Uvarint(d) if n <= 0 { return fmt.Errorf("error reading metadata value length for %q (n=%d)", mk, n) } d = d[n:] mv := string(d[:mvlen]) d = d[mvlen:] mvs[i] = mv } k.Metadata[i] = KeyValues{Key: mk, Values: mvs} } } return nil }