banyand/tsdb/series_write.go (185 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package tsdb
import (
"bytes"
"fmt"
"time"
"github.com/pkg/errors"
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/index"
)
// WriterBuilder is a helper to build a Writer.
type WriterBuilder interface {
Family(name []byte, val []byte) WriterBuilder
Time(ts time.Time) WriterBuilder
Val(val []byte) WriterBuilder
Build() (Writer, error)
}
// Writer allow ingesting data into a tsdb.
type Writer interface {
IndexWriter
Write() (GlobalItemID, error)
ItemID() GlobalItemID
String() string
}
var _ WriterBuilder = (*writerBuilder)(nil)
type writerBuilder struct {
series *seriesSpan
block blockDelegate
values []struct {
family []byte
val []byte
}
ts time.Time
seriesIDBytes []byte
}
func (w *writerBuilder) Family(name []byte, val []byte) WriterBuilder {
w.values = append(w.values, struct {
family []byte
val []byte
}{family: name, val: val})
return w
}
func (w *writerBuilder) Time(ts time.Time) WriterBuilder {
w.ts = ts
for _, b := range w.series.blocks {
if b.contains(ts) {
w.block = b
break
}
}
return w
}
func (w *writerBuilder) Val(val []byte) WriterBuilder {
w.values = append(w.values, struct {
family []byte
val []byte
}{val: val})
return w
}
var (
errNoTime = errors.New("no time specified")
errNoVal = errors.New("no value specified")
errDuplicatedFamily = errors.New("duplicated family")
)
func (w *writerBuilder) Build() (Writer, error) {
if w.block == nil {
return nil, errors.WithMessagef(errNoTime, "ts:%v", w.ts)
}
if len(w.values) < 1 {
return nil, errors.WithStack(errNoVal)
}
for i, value := range w.values {
for j := i + 1; j < len(w.values); j++ {
if value.family == nil && w.values[j].family == nil {
return nil, errors.Wrap(errDuplicatedFamily, "default family")
}
if bytes.Equal(value.family, w.values[j].family) {
return nil, errors.Wrapf(errDuplicatedFamily, "family:%s", value.family)
}
}
}
segID, blockID := w.block.identity()
return &writer{
block: w.block,
ts: w.ts,
itemID: &GlobalItemID{
ShardID: w.series.shardID,
segID: segID,
blockID: blockID,
SeriesID: w.series.seriesID,
ID: common.ItemID(uint64(w.ts.UnixNano())),
},
columns: w.values,
}, nil
}
func newWriterBuilder(seriesSpan *seriesSpan) WriterBuilder {
return &writerBuilder{
series: seriesSpan,
seriesIDBytes: seriesSpan.seriesID.Marshal(),
}
}
var _ Writer = (*writer)(nil)
type writer struct {
ts time.Time
block blockDelegate
itemID *GlobalItemID
columns []struct {
family []byte
val []byte
}
}
func (w *writer) ItemID() GlobalItemID {
return *w.itemID
}
func (w *writer) WriteLSMIndex(fields []index.Field) error {
for i, f := range fields {
f.Key.SeriesID = w.itemID.SeriesID
fields[i] = f
}
return w.block.writeLSMIndex(fields, w.itemID.ID)
}
func (w *writer) WriteInvertedIndex(fields []index.Field) error {
for i, f := range fields {
f.Key.SeriesID = w.itemID.SeriesID
fields[i] = f
}
return w.block.writeInvertedIndex(fields, w.itemID.ID)
}
func (w *writer) String() string {
var buf []byte
buf = append(buf, "block:"...)
buf = append(buf, w.block.String()...)
buf = append(buf, ",column:"...)
buf = append(buf, fmt.Sprintf("%+v", w.columns)...)
buf = append(buf, ",ts:"...)
buf = append(buf, w.ts.String()...)
return string(buf)
}
type dataBucket struct {
family []byte
seriesID common.SeriesID
}
func (d dataBucket) marshal() []byte {
if d.family == nil {
return d.seriesID.Marshal()
}
return bytes.Join([][]byte{
d.seriesID.Marshal(),
d.family,
}, nil)
}
func (w *writer) Write() (GlobalItemID, error) {
id := w.ItemID()
for _, c := range w.columns {
err := w.block.write(dataBucket{
seriesID: w.itemID.SeriesID,
family: c.family,
}.marshal(),
c.val, w.ts)
if err != nil {
return id, err
}
}
return id, w.block.writePrimaryIndex(index.Field{
Key: index.FieldKey{
SeriesID: id.SeriesID,
},
Term: convert.Int64ToBytes(w.ts.UnixNano()),
}, id.ID)
}
var _ IndexWriter = (*seriesIndexWriter)(nil)
type seriesIndexWriter struct {
seriesDB SeriesDatabase
seriesID common.SeriesID
}
func (s *seriesIndexWriter) WriteInvertedIndex(fields []index.Field) error {
return s.seriesDB.writeInvertedIndex(fields, s.seriesID)
}
func (s *seriesIndexWriter) WriteLSMIndex(fields []index.Field) error {
return s.seriesDB.writeLSMIndex(fields, s.seriesID)
}
// NewSeriesIndexWriter returns a new series index writer.
func NewSeriesIndexWriter(seriesID common.SeriesID, seriesDB SeriesDatabase) IndexWriter {
return &seriesIndexWriter{
seriesID: seriesID,
seriesDB: seriesDB,
}
}