func()

in banyand/stream/stream_write.go [54:134]


func (s *stream) write(shardID common.ShardID, entity []byte, entityValues tsdb.EntityValues, value *streamv1.ElementValue) error {
	tp := value.GetTimestamp().AsTime().Local()
	if err := timestamp.Check(tp); err != nil {
		return errors.WithMessage(err, "writing stream")
	}
	sm := s.schema
	fLen := len(value.GetTagFamilies())
	if fLen < 1 {
		return errors.Wrap(errMalformedElement, "no tag family")
	}
	if fLen > len(sm.TagFamilies) {
		return errors.Wrap(errMalformedElement, "tag family number is more than expected")
	}
	shard, err := s.db.SupplyTSDB().CreateShardsAndGetByID(shardID)
	if err != nil {
		return err
	}
	series, err := shard.Series().Get(entity, entityValues)
	if err != nil {
		return err
	}
	t := timestamp.MToN(tp)
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	wp, err := series.Create(ctx, t)
	if err != nil {
		if wp != nil {
			_ = wp.Close()
		}
		return err
	}
	writeFn := func() (tsdb.Writer, error) {
		builder := wp.WriterBuilder().Time(t)
		size := 0
		for fi, family := range value.GetTagFamilies() {
			spec := sm.GetTagFamilies()[fi]
			bb, errMarshal := pbv1.EncodeFamily(spec, family)
			if errMarshal != nil {
				return nil, errMarshal
			}
			builder.Family(tsdb.Hash([]byte(spec.GetName())), bb)
			size += len(bb)
		}
		builder.Val([]byte(value.GetElementId()))
		writer, errWrite := builder.Build()
		if errWrite != nil {
			return nil, errWrite
		}
		_, errWrite = writer.Write()
		writtenBytes.WithLabelValues(s.group).Add(float64(size))
		if e := s.l.Debug(); e.Enabled() {
			e.Time("ts", t).
				Int("ts_nano", t.Nanosecond()).
				RawJSON("data", logger.Proto(value)).
				Uint64("series_id", uint64(series.ID())).
				Stringer("series", series).
				Uint64("item_id", uint64(writer.ItemID().ID)).
				Int("shard_id", int(shardID)).
				Str("stream", sm.Metadata.GetName()).
				Msg("write stream")
		}
		return writer, errWrite
	}
	writer, err := writeFn()
	if err != nil {
		_ = wp.Close()
		return err
	}
	m := index.Message{
		Scope:        tsdb.Entry(s.name),
		IndexWriter:  writer,
		GlobalItemID: writer.ItemID(),
		Value: index.Value{
			TagFamilies: value.GetTagFamilies(),
			Timestamp:   t,
		},
		BlockCloser: wp,
	}
	s.indexWriter.Write(m)
	return err
}