in banyand/stream/stream_write.go [54:134]
func (s *stream) write(shardID common.ShardID, entity []byte, entityValues tsdb.EntityValues, value *streamv1.ElementValue) error {
tp := value.GetTimestamp().AsTime().Local()
if err := timestamp.Check(tp); err != nil {
return errors.WithMessage(err, "writing stream")
}
sm := s.schema
fLen := len(value.GetTagFamilies())
if fLen < 1 {
return errors.Wrap(errMalformedElement, "no tag family")
}
if fLen > len(sm.TagFamilies) {
return errors.Wrap(errMalformedElement, "tag family number is more than expected")
}
shard, err := s.db.SupplyTSDB().CreateShardsAndGetByID(shardID)
if err != nil {
return err
}
series, err := shard.Series().Get(entity, entityValues)
if err != nil {
return err
}
t := timestamp.MToN(tp)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
wp, err := series.Create(ctx, t)
if err != nil {
if wp != nil {
_ = wp.Close()
}
return err
}
writeFn := func() (tsdb.Writer, error) {
builder := wp.WriterBuilder().Time(t)
size := 0
for fi, family := range value.GetTagFamilies() {
spec := sm.GetTagFamilies()[fi]
bb, errMarshal := pbv1.EncodeFamily(spec, family)
if errMarshal != nil {
return nil, errMarshal
}
builder.Family(tsdb.Hash([]byte(spec.GetName())), bb)
size += len(bb)
}
builder.Val([]byte(value.GetElementId()))
writer, errWrite := builder.Build()
if errWrite != nil {
return nil, errWrite
}
_, errWrite = writer.Write()
writtenBytes.WithLabelValues(s.group).Add(float64(size))
if e := s.l.Debug(); e.Enabled() {
e.Time("ts", t).
Int("ts_nano", t.Nanosecond()).
RawJSON("data", logger.Proto(value)).
Uint64("series_id", uint64(series.ID())).
Stringer("series", series).
Uint64("item_id", uint64(writer.ItemID().ID)).
Int("shard_id", int(shardID)).
Str("stream", sm.Metadata.GetName()).
Msg("write stream")
}
return writer, errWrite
}
writer, err := writeFn()
if err != nil {
_ = wp.Close()
return err
}
m := index.Message{
Scope: tsdb.Entry(s.name),
IndexWriter: writer,
GlobalItemID: writer.ItemID(),
Value: index.Value{
TagFamilies: value.GetTagFamilies(),
Timestamp: t,
},
BlockCloser: wp,
}
s.indexWriter.Write(m)
return err
}