banyand/stream/write.go (346 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package stream
import (
"bytes"
"context"
"fmt"
"strings"
"google.golang.org/protobuf/types/known/anypb"
"github.com/apache/skywalking-banyandb/api/common"
databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
"github.com/apache/skywalking-banyandb/banyand/internal/storage"
"github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/index"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
type writeCallback struct {
l *logger.Logger
schemaRepo *schemaRepo
maxDiskUsagePercent int
}
func setUpWriteCallback(l *logger.Logger, schemaRepo *schemaRepo, maxDiskUsagePercent int) bus.MessageListener {
if maxDiskUsagePercent > 100 {
maxDiskUsagePercent = 100
}
return &writeCallback{
l: l,
schemaRepo: schemaRepo,
maxDiskUsagePercent: maxDiskUsagePercent,
}
}
func (w *writeCallback) CheckHealth() *common.Error {
if w.maxDiskUsagePercent < 1 {
return common.NewErrorWithStatus(modelv1.Status_STATUS_DISK_FULL, "stream is readonly because \"stream-max-disk-usage-percent\" is 0")
}
diskPercent := observability.GetPathUsedPercent(w.schemaRepo.path)
if diskPercent < w.maxDiskUsagePercent {
return nil
}
w.l.Warn().Int("maxPercent", w.maxDiskUsagePercent).Int("diskPercent", diskPercent).Msg("disk usage is too high, stop writing")
return common.NewErrorWithStatus(modelv1.Status_STATUS_DISK_FULL, "disk usage is too high, stop writing")
}
func (w *writeCallback) handle(dst map[string]*elementsInGroup, writeEvent *streamv1.InternalWriteRequest,
docIDBuilder *strings.Builder,
) (map[string]*elementsInGroup, error) {
req := writeEvent.Request
t := req.Element.Timestamp.AsTime().Local()
if err := timestamp.Check(t); err != nil {
return nil, fmt.Errorf("invalid timestamp: %w", err)
}
ts := t.UnixNano()
gn := req.Metadata.Group
tsdb, err := w.schemaRepo.loadTSDB(gn)
if err != nil {
return nil, fmt.Errorf("cannot load tsdb for group %s: %w", gn, err)
}
eg, ok := dst[gn]
if !ok {
eg = &elementsInGroup{
tsdb: tsdb,
tables: make([]*elementsInTable, 0),
segments: make([]storage.Segment[*tsTable, option], 0),
docIDsAdded: make(map[uint64]struct{}), // Initialize the map
}
dst[gn] = eg
}
if eg.latestTS < ts {
eg.latestTS = ts
}
var et *elementsInTable
for i := range eg.tables {
if eg.tables[i].timeRange.Contains(ts) {
et = eg.tables[i]
break
}
}
shardID := common.ShardID(writeEvent.ShardId)
if et == nil {
var segment storage.Segment[*tsTable, option]
for _, seg := range eg.segments {
if seg.GetTimeRange().Contains(ts) {
segment = seg
}
}
if segment == nil {
segment, err = tsdb.CreateSegmentIfNotExist(t)
if err != nil {
return nil, fmt.Errorf("cannot create segment: %w", err)
}
eg.segments = append(eg.segments, segment)
}
tstb, err := segment.CreateTSTableIfNotExist(shardID)
if err != nil {
return nil, fmt.Errorf("cannot create ts table: %w", err)
}
et = &elementsInTable{
timeRange: segment.GetTimeRange(),
tsTable: tstb,
elements: generateElements(),
}
et.elements.reset()
eg.tables = append(eg.tables, et)
}
et.elements.timestamps = append(et.elements.timestamps, ts)
docIDBuilder.Reset()
docIDBuilder.WriteString(req.Metadata.Name)
docIDBuilder.WriteByte('|')
docIDBuilder.WriteString(req.Element.ElementId)
eID := convert.HashStr(docIDBuilder.String())
et.elements.elementIDs = append(et.elements.elementIDs, eID)
stm, ok := w.schemaRepo.loadStream(writeEvent.GetRequest().GetMetadata())
if !ok {
return nil, fmt.Errorf("cannot find stream definition: %s", writeEvent.GetRequest().GetMetadata())
}
fLen := len(req.Element.GetTagFamilies())
if fLen < 1 {
return nil, fmt.Errorf("%s has no tag family", req)
}
if fLen > len(stm.schema.GetTagFamilies()) {
return nil, fmt.Errorf("%s has more tag families than %s", req.Metadata, stm.schema)
}
series := &pbv1.Series{
Subject: req.Metadata.Name,
EntityValues: writeEvent.EntityValues,
}
if err := series.Marshal(); err != nil {
return nil, fmt.Errorf("cannot marshal series: %w", err)
}
et.elements.seriesIDs = append(et.elements.seriesIDs, series.ID)
is := stm.indexSchema.Load().(indexSchema)
tagFamilies := make([]tagValues, 0, len(stm.schema.TagFamilies))
if len(is.indexRuleLocators.TagFamilyTRule) != len(stm.GetSchema().GetTagFamilies()) {
logger.Panicf("metadata crashed, tag family rule length %d, tag family length %d",
len(is.indexRuleLocators.TagFamilyTRule), len(stm.GetSchema().GetTagFamilies()))
}
var fields []index.Field
for i := range stm.GetSchema().GetTagFamilies() {
var tagFamily *modelv1.TagFamilyForWrite
if len(req.Element.TagFamilies) <= i {
tagFamily = pbv1.NullTagFamily
} else {
tagFamily = req.Element.TagFamilies[i]
}
tfr := is.indexRuleLocators.TagFamilyTRule[i]
tagFamilySpec := stm.GetSchema().GetTagFamilies()[i]
tf := tagValues{
tag: tagFamilySpec.Name,
}
for j := range tagFamilySpec.Tags {
var tagValue *modelv1.TagValue
if tagFamily == pbv1.NullTagFamily || len(tagFamily.Tags) <= j {
tagValue = pbv1.NullTagValue
} else {
tagValue = tagFamily.Tags[j]
}
t := tagFamilySpec.Tags[j]
if r, ok := tfr[t.Name]; ok && tagValue != pbv1.NullTagValue {
fields = appendField(fields, index.FieldKey{
IndexRuleID: r.GetMetadata().GetId(),
Analyzer: r.Analyzer,
SeriesID: series.ID,
}, t.Type, tagValue, r.GetNoSort())
}
_, isEntity := is.indexRuleLocators.EntitySet[t.Name]
if tagFamilySpec.Tags[j].IndexedOnly || isEntity {
continue
}
tf.values = append(tf.values, encodeTagValue(
t.Name,
t.Type,
tagValue))
}
if len(tf.values) > 0 {
tagFamilies = append(tagFamilies, tf)
}
}
et.elements.tagFamilies = append(et.elements.tagFamilies, tagFamilies)
et.docs = append(et.docs, index.Document{
DocID: eID,
Fields: fields,
Timestamp: ts,
})
docID := uint64(series.ID)
if _, exists := eg.docIDsAdded[docID]; !exists {
eg.docs = append(eg.docs, index.Document{
DocID: docID,
EntityValues: series.Buffer,
})
eg.docIDsAdded[docID] = struct{}{}
}
return dst, nil
}
func (w *writeCallback) Rev(_ context.Context, message bus.Message) (resp bus.Message) {
events, ok := message.Data().([]any)
if !ok {
w.l.Warn().Msg("invalid event data type")
return
}
if len(events) < 1 {
w.l.Warn().Msg("empty event")
return
}
groups := make(map[string]*elementsInGroup)
var builder strings.Builder
for i := range events {
var writeEvent *streamv1.InternalWriteRequest
switch e := events[i].(type) {
case *streamv1.InternalWriteRequest:
writeEvent = e
case *anypb.Any:
writeEvent = &streamv1.InternalWriteRequest{}
if err := e.UnmarshalTo(writeEvent); err != nil {
w.l.Error().Err(err).RawJSON("written", logger.Proto(e)).Msg("fail to unmarshal event")
continue
}
default:
w.l.Warn().Msg("invalid event data type")
continue
}
var err error
if groups, err = w.handle(groups, writeEvent, &builder); err != nil {
w.l.Error().Err(err).Msg("cannot handle write event")
groups = make(map[string]*elementsInGroup)
continue
}
}
for i := range groups {
g := groups[i]
for j := range g.tables {
es := g.tables[j]
es.tsTable.mustAddElements(es.elements)
releaseElements(es.elements)
if len(es.docs) > 0 {
index := es.tsTable.Index()
if err := index.Write(es.docs); err != nil {
w.l.Error().Err(err).Msg("cannot write element index")
}
}
}
if len(g.docs) > 0 {
for _, segment := range g.segments {
if err := segment.IndexDB().Insert(g.docs); err != nil {
w.l.Error().Err(err).Msg("cannot write index")
}
segment.DecRef()
}
}
g.tsdb.Tick(g.latestTS)
}
return
}
func encodeTagValue(name string, tagType databasev1.TagType, tagVal *modelv1.TagValue) *tagValue {
tv := generateTagValue()
tv.tag = name
switch tagType {
case databasev1.TagType_TAG_TYPE_INT:
tv.valueType = pbv1.ValueTypeInt64
if tagVal.GetInt() != nil {
tv.value = convert.Int64ToBytes(tagVal.GetInt().GetValue())
}
case databasev1.TagType_TAG_TYPE_STRING:
tv.valueType = pbv1.ValueTypeStr
if tagVal.GetStr() != nil {
tv.value = convert.StringToBytes(tagVal.GetStr().GetValue())
}
case databasev1.TagType_TAG_TYPE_DATA_BINARY:
tv.valueType = pbv1.ValueTypeBinaryData
if tagVal.GetBinaryData() != nil {
tv.value = bytes.Clone(tagVal.GetBinaryData())
}
case databasev1.TagType_TAG_TYPE_INT_ARRAY:
tv.valueType = pbv1.ValueTypeInt64Arr
if tagVal.GetIntArray() == nil {
return tv
}
tv.valueArr = make([][]byte, len(tagVal.GetIntArray().Value))
for i := range tagVal.GetIntArray().Value {
tv.valueArr[i] = convert.Int64ToBytes(tagVal.GetIntArray().Value[i])
}
case databasev1.TagType_TAG_TYPE_STRING_ARRAY:
tv.valueType = pbv1.ValueTypeStrArr
if tagVal.GetStrArray() == nil {
return tv
}
tv.valueArr = make([][]byte, len(tagVal.GetStrArray().Value))
for i := range tagVal.GetStrArray().Value {
tv.valueArr[i] = []byte(tagVal.GetStrArray().Value[i])
}
default:
logger.Panicf("unsupported tag value type: %T", tagVal.GetValue())
}
return tv
}
func appendField(dest []index.Field, fieldKey index.FieldKey, tagType databasev1.TagType, tagVal *modelv1.TagValue, noSort bool) []index.Field {
switch tagType {
case databasev1.TagType_TAG_TYPE_INT:
v := tagVal.GetInt()
if v == nil {
return dest
}
f := index.NewIntField(fieldKey, v.Value)
f.NoSort = noSort
dest = append(dest, f)
case databasev1.TagType_TAG_TYPE_STRING:
v := tagVal.GetStr()
if v == nil {
return dest
}
f := index.NewStringField(fieldKey, v.Value)
f.NoSort = noSort
dest = append(dest, f)
case databasev1.TagType_TAG_TYPE_DATA_BINARY:
v := tagVal.GetBinaryData()
if v == nil {
return dest
}
f := index.NewBytesField(fieldKey, v)
f.NoSort = noSort
dest = append(dest, f)
case databasev1.TagType_TAG_TYPE_INT_ARRAY:
if tagVal.GetIntArray() == nil {
return dest
}
for i := range tagVal.GetIntArray().Value {
f := index.NewIntField(fieldKey, tagVal.GetIntArray().Value[i])
f.NoSort = noSort
dest = append(dest, f)
}
case databasev1.TagType_TAG_TYPE_STRING_ARRAY:
if tagVal.GetStrArray() == nil {
return dest
}
for i := range tagVal.GetStrArray().Value {
f := index.NewStringField(fieldKey, tagVal.GetStrArray().Value[i])
f.NoSort = noSort
dest = append(dest, f)
}
default:
logger.Panicf("unsupported tag value type: %T", tagVal.GetValue())
}
return dest
}