pkg/prompb/protobuf.go (289 lines of code) (raw):

package prompb import ( "fmt" mathbits "math/bits" "github.com/VictoriaMetrics/easyproto" ) var ( mp = &easyproto.MarshalerPool{} WriteRequestPool = NewPool[*WriteRequest]( func() *WriteRequest { return &WriteRequest{} }, ) TimeSeriesPool = NewPool[*TimeSeries]( func() *TimeSeries { return &TimeSeries{} }, ) ) // WriteRequest represents Prometheus remote write API request type WriteRequest struct { Timeseries []*TimeSeries } // TimeSeries is a timeseries. type TimeSeries struct { Labels []*Label Samples []*Sample } // Label is a timeseries label type Label struct { Name []byte Value []byte } // Sample is a timeseries sample. type Sample struct { Value float64 Timestamp int64 } // Unmarshal unmarshals m from src. func (wr *WriteRequest) Unmarshal(src []byte) (err error) { wr.Timeseries = wr.Timeseries[:0] var fc easyproto.FieldContext for len(src) > 0 { src, err = fc.NextField(src) if err != nil { return fmt.Errorf("cannot read next field in Timeseries message") } switch fc.FieldNum { case 1: data, ok := fc.MessageData() if !ok { return fmt.Errorf("cannot read Timeseries sample data") } if cap(wr.Timeseries) > len(wr.Timeseries) { wr.Timeseries = wr.Timeseries[:len(wr.Timeseries)+1] wr.Timeseries[len(wr.Timeseries)-1] = TimeSeriesPool.Get() } else { wr.Timeseries = append(wr.Timeseries, TimeSeriesPool.Get()) } ts := wr.Timeseries[len(wr.Timeseries)-1] if err := ts.unmarshalProtobuf(data); err != nil { return fmt.Errorf("cannot unmarshal sample: %w", err) } } } return nil } func (wr *WriteRequest) Marshal() (dAtA []byte, err error) { b := make([]byte, wr.Size()) return wr.MarshalTo(b[:0]) } func (wr *WriteRequest) MarshalTo(dst []byte) ([]byte, error) { marshaller := mp.Get() marshaller.Reset() mm := marshaller.MessageMarshaler() for _, ts := range wr.Timeseries { ts.marshalProtobuf(mm.AppendMessage(1)) } dst = marshaller.Marshal(dst[:0]) mp.Put(marshaller) return dst, nil } // Reset resets wr. func (wr *WriteRequest) Reset() { for i := range wr.Timeseries { ts := wr.Timeseries[i] TimeSeriesPool.Put(ts) } wr.Timeseries = wr.Timeseries[:0] } func (wr *WriteRequest) Size() (n int) { if wr == nil { return 0 } var l int _ = l if len(wr.Timeseries) > 0 { for _, e := range wr.Timeseries { l = e.Size() n += 1 + l + sizeOf(uint64(l)) } } return n } func (s *Sample) Size() (n int) { if s == nil { return 0 } var l int _ = l if s.Value != 0 { n += 9 } if s.Timestamp != 0 { n += 1 + sizeOf(uint64(s.Timestamp)) } return n } func (s *Sample) marshalProtobuf(mm *easyproto.MessageMarshaler) { mm.AppendDouble(1, s.Value) mm.AppendInt64(2, s.Timestamp) } func (s *Sample) unmarshalProtobuf(src []byte) (err error) { // Set default Sample values s.Value = 0 s.Timestamp = 0 // Parse Sample message at src var fc easyproto.FieldContext for len(src) > 0 { src, err = fc.NextField(src) if err != nil { return fmt.Errorf("cannot read next field in sample") } switch fc.FieldNum { case 1: value, ok := fc.Double() if !ok { return fmt.Errorf("cannot read sample value") } s.Value = value case 2: timestamp, ok := fc.Int64() if !ok { return fmt.Errorf("cannot read sample timestamp") } s.Timestamp = timestamp } } return nil } func (s *Sample) Reset() { s.Value = 0 s.Timestamp = 0 } func (m *TimeSeries) Size() (n int) { if m == nil { return 0 } var l int _ = l if len(m.Labels) > 0 { for _, e := range m.Labels { l = e.Size() n += 1 + l + sizeOf(uint64(l)) } } if len(m.Samples) > 0 { for _, e := range m.Samples { l = e.Size() n += 1 + l + sizeOf(uint64(l)) } } return n } func (m *TimeSeries) marshalProtobuf(mm *easyproto.MessageMarshaler) { for _, l := range m.Labels { l.marshalProtobuf(mm.AppendMessage(1)) } for _, s := range m.Samples { s.marshalProtobuf(mm.AppendMessage(2)) } } func (m *TimeSeries) unmarshalProtobuf(src []byte) (err error) { m.Labels = m.Labels[:0] m.Samples = m.Samples[:0] var fc easyproto.FieldContext for len(src) > 0 { src, err = fc.NextField(src) if err != nil { return fmt.Errorf("cannot read next field in Timeseries message") } switch fc.FieldNum { case 1: data, ok := fc.MessageData() if !ok { return fmt.Errorf("cannot read Timeseries sample data") } m.Labels = append(m.Labels, &Label{}) s := m.Labels[len(m.Labels)-1] if err := s.unmarshalProtobuf(data); err != nil { return fmt.Errorf("cannot unmarshal sample: %w", err) } case 2: data, ok := fc.MessageData() if !ok { return fmt.Errorf("cannot read Timeseries sample data") } m.Samples = append(m.Samples, &Sample{}) s := m.Samples[len(m.Samples)-1] if err := s.unmarshalProtobuf(data); err != nil { return fmt.Errorf("cannot unmarshal sample: %w", err) } } } return nil } func (ts *TimeSeries) Reset() { for i := range ts.Labels { ts.Labels[i].Reset() } for i := range ts.Samples { ts.Samples[i].Reset() } ts.Labels = ts.Labels[:0] ts.Samples = ts.Samples[:0] } func (m *TimeSeries) AppendLabel(key []byte, value []byte) { m.Labels = append(m.Labels, &Label{}) l := m.Labels[len(m.Labels)-1] l.Name = append(l.Name[:0], key...) l.Value = append(l.Value[:0], value...) } func (m *TimeSeries) AppendLabelString(key string, value string) { m.Labels = append(m.Labels, &Label{}) l := m.Labels[len(m.Labels)-1] l.Name = append(l.Name[:0], key...) l.Value = append(l.Value[:0], value...) } func (m *TimeSeries) AppendSample(timestamp int64, value float64) { m.Samples = append(m.Samples, &Sample{}) s := m.Samples[len(m.Samples)-1] s.Timestamp = timestamp s.Value = value } func (m *Label) Size() (n int) { if m == nil { return 0 } var l int _ = l l = len(m.Name) if l > 0 { n += 1 + l + sizeOf(uint64(l)) } l = len(m.Value) if l > 0 { n += 1 + l + sizeOf(uint64(l)) } return n } func (m *Label) marshalProtobuf(mm *easyproto.MessageMarshaler) { mm.AppendBytes(1, m.Name) mm.AppendBytes(2, m.Value) } func (m *Label) unmarshalProtobuf(src []byte) (err error) { // Set default Sample values m.Name = m.Name[:0] m.Value = m.Value[:0] // Parse Sample message at src var fc easyproto.FieldContext for len(src) > 0 { src, err = fc.NextField(src) if err != nil { return fmt.Errorf("cannot read next field in sample") } switch fc.FieldNum { case 1: name, ok := fc.Bytes() if !ok { return fmt.Errorf("cannot read sample value") } m.Name = append(m.Name[:0], name...) case 2: value, ok := fc.Bytes() if !ok { return fmt.Errorf("cannot read sample value") } m.Value = append(m.Value[:0], value...) } } return nil } func (l *Label) Reset() { l.Name = l.Name[:0] l.Value = l.Value[:0] } func sizeOf(x uint64) (n int) { return (mathbits.Len64(x|1) + 6) / 7 }