banyand/measure/write.go (494 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package measure
import (
"bytes"
"context"
"fmt"
"time"
"google.golang.org/protobuf/types/known/anypb"
"github.com/apache/skywalking-banyandb/api/common"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/internal/storage"
"github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/index"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/partition"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
var subjectField = index.FieldKey{TagName: index.IndexModeName}
type writeCallback struct {
l *logger.Logger
schemaRepo *schemaRepo
maxDiskUsagePercent int
}
func setUpWriteCallback(l *logger.Logger, schemaRepo *schemaRepo, maxDiskUsagePercent int) bus.MessageListener {
if maxDiskUsagePercent > 100 {
maxDiskUsagePercent = 100
}
return &writeCallback{
l: l,
schemaRepo: schemaRepo,
maxDiskUsagePercent: maxDiskUsagePercent,
}
}
func (w *writeCallback) CheckHealth() *common.Error {
if w.maxDiskUsagePercent < 1 {
return common.NewErrorWithStatus(modelv1.Status_STATUS_DISK_FULL, "measure is readonly because \"measure-max-disk-usage-percent\" is 0")
}
diskPercent := observability.GetPathUsedPercent(w.schemaRepo.path)
if diskPercent < w.maxDiskUsagePercent {
return nil
}
w.l.Warn().Int("maxPercent", w.maxDiskUsagePercent).Int("diskPercent", diskPercent).Msg("disk usage is too high, stop writing")
return common.NewErrorWithStatus(modelv1.Status_STATUS_DISK_FULL, "disk usage is too high, stop writing")
}
func (w *writeCallback) handle(dst map[string]*dataPointsInGroup, writeEvent *measurev1.InternalWriteRequest) (map[string]*dataPointsInGroup, error) {
req := writeEvent.Request
t := req.DataPoint.Timestamp.AsTime().Local()
if err := timestamp.Check(t); err != nil {
return nil, fmt.Errorf("invalid timestamp: %w", err)
}
ts := t.UnixNano()
gn := req.Metadata.Group
tsdb, err := w.schemaRepo.loadTSDB(gn)
if err != nil {
return nil, fmt.Errorf("cannot load tsdb for group %s: %w", gn, err)
}
dpg, ok := dst[gn]
if !ok {
dpg = &dataPointsInGroup{
tsdb: tsdb,
tables: make([]*dataPointsInTable, 0),
segments: make([]storage.Segment[*tsTable, option], 0),
metadataDocMap: make(map[uint64]int),
indexModeDocMap: make(map[uint64]int),
}
dst[gn] = dpg
}
if dpg.latestTS < ts {
dpg.latestTS = ts
}
var dpt *dataPointsInTable
for i := range dpg.tables {
if dpg.tables[i].timeRange.Contains(ts) {
dpt = dpg.tables[i]
break
}
}
stm, ok := w.schemaRepo.loadMeasure(req.GetMetadata())
if !ok {
return nil, fmt.Errorf("cannot find measure definition: %s", req.GetMetadata())
}
fLen := len(req.DataPoint.GetTagFamilies())
if fLen < 1 {
return nil, fmt.Errorf("%s has no tag family", req.Metadata)
}
if fLen > len(stm.schema.GetTagFamilies()) {
return nil, fmt.Errorf("%s has more tag families than %s", req.Metadata, stm.schema)
}
is := stm.indexSchema.Load().(indexSchema)
if len(is.indexRuleLocators.TagFamilyTRule) != len(stm.GetSchema().GetTagFamilies()) {
logger.Panicf("metadata crashed, tag family rule length %d, tag family length %d",
len(is.indexRuleLocators.TagFamilyTRule), len(stm.GetSchema().GetTagFamilies()))
}
shardID := common.ShardID(writeEvent.ShardId)
if dpt == nil {
if dpt, err = w.newDpt(tsdb, dpg, t, ts, shardID, stm.schema.IndexMode); err != nil {
return nil, fmt.Errorf("cannot create data points in table: %w", err)
}
}
series := &pbv1.Series{
Subject: req.Metadata.Name,
EntityValues: writeEvent.EntityValues,
}
if err := series.Marshal(); err != nil {
return nil, fmt.Errorf("cannot marshal series: %w", err)
}
if stm.schema.IndexMode {
fields := handleIndexMode(stm.schema, req, is.indexRuleLocators)
fields = w.appendEntityTagsToIndexFields(fields, stm, series)
doc := index.Document{
DocID: uint64(series.ID),
EntityValues: series.Buffer,
Fields: fields,
Version: req.DataPoint.Version,
Timestamp: ts,
}
if pos, exists := dpg.indexModeDocMap[doc.DocID]; exists {
dpg.indexModeDocs[pos] = doc
} else {
dpg.indexModeDocMap[doc.DocID] = len(dpg.indexModeDocs)
dpg.indexModeDocs = append(dpg.indexModeDocs, doc)
}
return dst, nil
}
fields := appendDataPoints(dpt, ts, series.ID, stm.GetSchema(), req, is.indexRuleLocators)
doc := index.Document{
DocID: uint64(series.ID),
EntityValues: series.Buffer,
Fields: fields,
}
if pos, exists := dpg.metadataDocMap[doc.DocID]; exists {
dpg.metadataDocs[pos] = doc
} else {
dpg.metadataDocMap[doc.DocID] = len(dpg.metadataDocs)
dpg.metadataDocs = append(dpg.metadataDocs, doc)
}
if p, _ := w.schemaRepo.topNProcessorMap.Load(getKey(stm.schema.GetMetadata())); p != nil {
p.(*topNProcessorManager).onMeasureWrite(uint64(series.ID), uint32(shardID), &measurev1.InternalWriteRequest{
Request: &measurev1.WriteRequest{
Metadata: stm.GetSchema().Metadata,
DataPoint: req.DataPoint,
MessageId: uint64(time.Now().UnixNano()),
},
EntityValues: writeEvent.EntityValues,
}, stm)
}
return dst, nil
}
func appendDataPoints(dest *dataPointsInTable, ts int64, sid common.SeriesID, schema *databasev1.Measure,
req *measurev1.WriteRequest, locator partition.IndexRuleLocator,
) []index.Field {
tagFamily, fields := handleTagFamily(schema, req, locator)
if dest.dataPoints == nil {
dest.dataPoints = generateDataPoints()
dest.dataPoints.reset()
}
dataPoints := dest.dataPoints
dataPoints.tagFamilies = append(dataPoints.tagFamilies, tagFamily)
dataPoints.timestamps = append(dataPoints.timestamps, ts)
dataPoints.versions = append(dataPoints.versions, req.DataPoint.Version)
dataPoints.seriesIDs = append(dataPoints.seriesIDs, sid)
field := nameValues{}
for i := range schema.GetFields() {
var v *modelv1.FieldValue
if len(req.DataPoint.Fields) <= i {
v = pbv1.NullFieldValue
} else {
v = req.DataPoint.Fields[i]
}
field.values = append(field.values, encodeFieldValue(
schema.GetFields()[i].GetName(),
schema.GetFields()[i].FieldType,
v,
))
}
dataPoints.fields = append(dataPoints.fields, field)
dest.dataPoints = dataPoints
return fields
}
func (w *writeCallback) newDpt(tsdb storage.TSDB[*tsTable, option], dpg *dataPointsInGroup,
t time.Time, ts int64, shardID common.ShardID, indexMode bool,
) (*dataPointsInTable, error) {
var segment storage.Segment[*tsTable, option]
for _, seg := range dpg.segments {
if seg.GetTimeRange().Contains(ts) {
segment = seg
}
}
if segment == nil {
var err error
segment, err = tsdb.CreateSegmentIfNotExist(t)
if err != nil {
return nil, fmt.Errorf("cannot create segment: %w", err)
}
dpg.segments = append(dpg.segments, segment)
}
if indexMode {
return &dataPointsInTable{
timeRange: segment.GetTimeRange(),
}, nil
}
tstb, err := segment.CreateTSTableIfNotExist(shardID)
if err != nil {
return nil, fmt.Errorf("cannot create ts table: %w", err)
}
dpt := &dataPointsInTable{
timeRange: segment.GetTimeRange(),
tsTable: tstb,
}
dpg.tables = append(dpg.tables, dpt)
return dpt, nil
}
func handleTagFamily(schema *databasev1.Measure, req *measurev1.WriteRequest, locator partition.IndexRuleLocator) ([]nameValues, []index.Field) {
tagFamilies := make([]nameValues, 0, len(schema.TagFamilies))
var fields []index.Field
for i := range schema.GetTagFamilies() {
var tagFamily *modelv1.TagFamilyForWrite
if len(req.DataPoint.TagFamilies) <= i {
tagFamily = pbv1.NullTagFamily
} else {
tagFamily = req.DataPoint.TagFamilies[i]
}
tfr := locator.TagFamilyTRule[i]
tagFamilySpec := schema.GetTagFamilies()[i]
tf := nameValues{
name: tagFamilySpec.Name,
}
for j := range tagFamilySpec.Tags {
var tagValue *modelv1.TagValue
if tagFamily == pbv1.NullTagFamily || len(tagFamily.Tags) <= j {
tagValue = pbv1.NullTagValue
} else {
tagValue = tagFamily.Tags[j]
}
t := tagFamilySpec.Tags[j]
encodeTagValue := encodeTagValue(
t.Name,
t.Type,
tagValue)
r, ok := tfr[t.Name]
if ok {
fieldKey := index.FieldKey{}
fieldKey.IndexRuleID = r.GetMetadata().GetId()
fieldKey.Analyzer = r.Analyzer
if encodeTagValue.value != nil {
f := index.NewBytesField(fieldKey, encodeTagValue.value)
f.Store = true
f.Index = true
f.NoSort = r.GetNoSort()
fields = append(fields, f)
} else {
for _, val := range encodeTagValue.valueArr {
f := index.NewBytesField(fieldKey, val)
f.Store = true
f.Index = true
f.NoSort = r.GetNoSort()
fields = append(fields, f)
}
}
releaseNameValue(encodeTagValue)
continue
}
_, isEntity := locator.EntitySet[t.Name]
if tagFamilySpec.Tags[j].IndexedOnly || isEntity {
releaseNameValue(encodeTagValue)
continue
}
tf.values = append(tf.values, encodeTagValue)
}
if len(tf.values) > 0 {
tagFamilies = append(tagFamilies, tf)
}
}
return tagFamilies, fields
}
func handleIndexMode(schema *databasev1.Measure, req *measurev1.WriteRequest, locator partition.IndexRuleLocator) []index.Field {
var fields []index.Field
for i := range schema.GetTagFamilies() {
var tagFamily *modelv1.TagFamilyForWrite
if len(req.DataPoint.TagFamilies) <= i {
tagFamily = pbv1.NullTagFamily
} else {
tagFamily = req.DataPoint.TagFamilies[i]
}
tfr := locator.TagFamilyTRule[i]
tagFamilySpec := schema.GetTagFamilies()[i]
for j := range tagFamilySpec.Tags {
var tagValue *modelv1.TagValue
if tagFamily == pbv1.NullTagFamily || len(tagFamily.Tags) <= j {
tagValue = pbv1.NullTagValue
} else {
tagValue = tagFamily.Tags[j]
}
t := tagFamilySpec.Tags[j]
encodeTagValue := encodeTagValue(
t.Name,
t.Type,
tagValue)
r, toIndex := tfr[t.Name]
fieldKey := index.FieldKey{}
if toIndex {
fieldKey.IndexRuleID = r.GetMetadata().GetId()
fieldKey.Analyzer = r.Analyzer
} else {
fieldKey.TagName = t.Name
}
if encodeTagValue.value != nil {
f := index.NewBytesField(fieldKey, encodeTagValue.value)
f.Store = true
f.Index = toIndex
f.NoSort = r.GetNoSort()
fields = append(fields, f)
} else {
for _, val := range encodeTagValue.valueArr {
f := index.NewBytesField(fieldKey, val)
f.Store = true
f.Index = toIndex
f.NoSort = r.GetNoSort()
fields = append(fields, f)
}
}
releaseNameValue(encodeTagValue)
}
}
return fields
}
func (w *writeCallback) appendEntityTagsToIndexFields(fields []index.Field, stm *measure, series *pbv1.Series) []index.Field {
f := index.NewStringField(subjectField, series.Subject)
f.Index = true
f.NoSort = true
fields = append(fields, f)
is := stm.indexSchema.Load().(indexSchema)
for i := range stm.schema.Entity.TagNames {
if _, exists := is.indexTagMap[stm.schema.Entity.TagNames[i]]; exists {
continue
}
tagName := stm.schema.Entity.TagNames[i]
var t *databasev1.TagSpec
for j := range stm.schema.TagFamilies {
for k := range stm.schema.TagFamilies[j].Tags {
if stm.schema.TagFamilies[j].Tags[k].Name == tagName {
t = stm.schema.TagFamilies[j].Tags[k]
}
}
}
encodeTagValue := encodeTagValue(
t.Name,
t.Type,
series.EntityValues[i])
if encodeTagValue.value != nil {
f = index.NewBytesField(index.FieldKey{TagName: index.IndexModeEntityTagPrefix + t.Name}, encodeTagValue.value)
f.Index = true
f.NoSort = true
fields = append(fields, f)
}
releaseNameValue(encodeTagValue)
}
return fields
}
func (w *writeCallback) Rev(_ context.Context, message bus.Message) (resp bus.Message) {
events, ok := message.Data().([]any)
if !ok {
w.l.Warn().Msg("invalid event data type")
return
}
if len(events) < 1 {
w.l.Warn().Msg("empty event")
return
}
groups := make(map[string]*dataPointsInGroup)
for i := range events {
var writeEvent *measurev1.InternalWriteRequest
switch e := events[i].(type) {
case *measurev1.InternalWriteRequest:
writeEvent = e
case *anypb.Any:
writeEvent = &measurev1.InternalWriteRequest{}
if err := e.UnmarshalTo(writeEvent); err != nil {
w.l.Error().Err(err).RawJSON("written", logger.Proto(e)).Msg("fail to unmarshal event")
continue
}
default:
w.l.Warn().Msg("invalid event data type")
continue
}
var err error
if groups, err = w.handle(groups, writeEvent); err != nil {
w.l.Error().Err(err).RawJSON("written", logger.Proto(writeEvent)).Msg("cannot handle write event")
groups = make(map[string]*dataPointsInGroup)
continue
}
}
for i := range groups {
g := groups[i]
for j := range g.tables {
dps := g.tables[j]
if dps.tsTable != nil {
dps.tsTable.mustAddDataPoints(dps.dataPoints)
}
if dps.dataPoints != nil {
releaseDataPoints(dps.dataPoints)
}
}
for _, segment := range g.segments {
if len(g.metadataDocs) > 0 {
if err := segment.IndexDB().Insert(g.metadataDocs); err != nil {
w.l.Error().Err(err).Msg("cannot write metadata")
}
}
if len(g.indexModeDocs) > 0 {
if err := segment.IndexDB().Update(g.indexModeDocs); err != nil {
w.l.Error().Err(err).Msg("cannot write index")
}
}
segment.DecRef()
}
g.tsdb.Tick(g.latestTS)
}
return
}
func encodeFieldValue(name string, fieldType databasev1.FieldType, fieldValue *modelv1.FieldValue) *nameValue {
nv := &nameValue{name: name}
switch fieldType {
case databasev1.FieldType_FIELD_TYPE_INT:
nv.valueType = pbv1.ValueTypeInt64
if fieldValue.GetInt() != nil {
nv.value = convert.Int64ToBytes(fieldValue.GetInt().GetValue())
}
case databasev1.FieldType_FIELD_TYPE_FLOAT:
nv.valueType = pbv1.ValueTypeFloat64
if fieldValue.GetFloat() != nil {
nv.value = convert.Float64ToBytes(fieldValue.GetFloat().GetValue())
}
case databasev1.FieldType_FIELD_TYPE_STRING:
nv.valueType = pbv1.ValueTypeStr
if fieldValue.GetStr() != nil {
nv.value = []byte(fieldValue.GetStr().GetValue())
}
case databasev1.FieldType_FIELD_TYPE_DATA_BINARY:
nv.valueType = pbv1.ValueTypeBinaryData
if fieldValue.GetBinaryData() != nil {
nv.value = bytes.Clone(fieldValue.GetBinaryData())
}
default:
logger.Panicf("unsupported field value type: %T", fieldValue.GetValue())
}
return nv
}
func encodeTagValue(name string, tagType databasev1.TagType, tagValue *modelv1.TagValue) *nameValue {
nv := generateNameValue()
nv.name = name
switch tagType {
case databasev1.TagType_TAG_TYPE_INT:
nv.valueType = pbv1.ValueTypeInt64
if tagValue.GetInt() != nil {
nv.value = convert.Int64ToBytes(tagValue.GetInt().GetValue())
}
case databasev1.TagType_TAG_TYPE_STRING:
nv.valueType = pbv1.ValueTypeStr
if tagValue.GetStr() != nil {
nv.value = convert.StringToBytes(tagValue.GetStr().GetValue())
}
case databasev1.TagType_TAG_TYPE_DATA_BINARY:
nv.valueType = pbv1.ValueTypeBinaryData
if tagValue.GetBinaryData() != nil {
nv.value = bytes.Clone(tagValue.GetBinaryData())
}
case databasev1.TagType_TAG_TYPE_INT_ARRAY:
nv.valueType = pbv1.ValueTypeInt64Arr
if tagValue.GetIntArray() == nil {
return nv
}
nv.valueArr = make([][]byte, len(tagValue.GetIntArray().Value))
for i := range tagValue.GetIntArray().Value {
nv.valueArr[i] = convert.Int64ToBytes(tagValue.GetIntArray().Value[i])
}
case databasev1.TagType_TAG_TYPE_STRING_ARRAY:
nv.valueType = pbv1.ValueTypeStrArr
if tagValue.GetStrArray() == nil {
return nv
}
nv.valueArr = make([][]byte, len(tagValue.GetStrArray().Value))
for i := range tagValue.GetStrArray().Value {
nv.valueArr[i] = []byte(tagValue.GetStrArray().Value[i])
}
default:
logger.Panicf("unsupported tag value type: %T", tagValue.GetValue())
}
return nv
}