banyand/stream/stream_write.go (140 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package stream
import (
"context"
"time"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/apache/skywalking-banyandb/api/common"
streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
"github.com/apache/skywalking-banyandb/banyand/tsdb"
"github.com/apache/skywalking-banyandb/banyand/tsdb/index"
"github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
var (
errMalformedElement = errors.New("element is malformed")
writtenBytes *prometheus.CounterVec
)
func init() {
labels := []string{"group"}
writtenBytes = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "banyand_written_stream_bytes",
Help: "written stream in bytes",
},
labels,
)
}
func (s *stream) write(shardID common.ShardID, entity []byte, entityValues tsdb.EntityValues, value *streamv1.ElementValue) error {
tp := value.GetTimestamp().AsTime().Local()
if err := timestamp.Check(tp); err != nil {
return errors.WithMessage(err, "writing stream")
}
sm := s.schema
fLen := len(value.GetTagFamilies())
if fLen < 1 {
return errors.Wrap(errMalformedElement, "no tag family")
}
if fLen > len(sm.TagFamilies) {
return errors.Wrap(errMalformedElement, "tag family number is more than expected")
}
shard, err := s.db.SupplyTSDB().CreateShardsAndGetByID(shardID)
if err != nil {
return err
}
series, err := shard.Series().Get(entity, entityValues)
if err != nil {
return err
}
t := timestamp.MToN(tp)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
wp, err := series.Create(ctx, t)
if err != nil {
if wp != nil {
_ = wp.Close()
}
return err
}
writeFn := func() (tsdb.Writer, error) {
builder := wp.WriterBuilder().Time(t)
size := 0
for fi, family := range value.GetTagFamilies() {
spec := sm.GetTagFamilies()[fi]
bb, errMarshal := pbv1.EncodeFamily(spec, family)
if errMarshal != nil {
return nil, errMarshal
}
builder.Family(tsdb.Hash([]byte(spec.GetName())), bb)
size += len(bb)
}
builder.Val([]byte(value.GetElementId()))
writer, errWrite := builder.Build()
if errWrite != nil {
return nil, errWrite
}
_, errWrite = writer.Write()
writtenBytes.WithLabelValues(s.group).Add(float64(size))
if e := s.l.Debug(); e.Enabled() {
e.Time("ts", t).
Int("ts_nano", t.Nanosecond()).
RawJSON("data", logger.Proto(value)).
Uint64("series_id", uint64(series.ID())).
Stringer("series", series).
Uint64("item_id", uint64(writer.ItemID().ID)).
Int("shard_id", int(shardID)).
Str("stream", sm.Metadata.GetName()).
Msg("write stream")
}
return writer, errWrite
}
writer, err := writeFn()
if err != nil {
_ = wp.Close()
return err
}
m := index.Message{
Scope: tsdb.Entry(s.name),
IndexWriter: writer,
GlobalItemID: writer.ItemID(),
Value: index.Value{
TagFamilies: value.GetTagFamilies(),
Timestamp: t,
},
BlockCloser: wp,
}
s.indexWriter.Write(m)
return err
}
type writeCallback struct {
l *logger.Logger
schemaRepo *schemaRepo
}
func setUpWriteCallback(l *logger.Logger, schemaRepo *schemaRepo) *writeCallback {
wcb := &writeCallback{
l: l,
schemaRepo: schemaRepo,
}
return wcb
}
func (w *writeCallback) Rev(message bus.Message) (resp bus.Message) {
writeEvent, ok := message.Data().(*streamv1.InternalWriteRequest)
if !ok {
w.l.Warn().Msg("invalid event data type")
return
}
stm, ok := w.schemaRepo.loadStream(writeEvent.GetRequest().GetMetadata())
if !ok {
w.l.Warn().Msg("cannot find stream definition")
return
}
err := stm.write(common.ShardID(writeEvent.GetShardId()), writeEvent.SeriesHash,
tsdb.DecodeEntityValues(writeEvent.GetEntityValues()), writeEvent.GetRequest().GetElement())
if err != nil {
w.l.Error().Err(err).RawJSON("written", logger.Proto(writeEvent)).Msg("fail to write entity")
}
return
}