in banyand/measure/measure_write.go [41:144]
func (s *measure) write(shardID common.ShardID, entity []byte, entityValues tsdb.EntityValues,
value *measurev1.DataPointValue,
) error {
t := value.GetTimestamp().AsTime().Local()
if err := timestamp.Check(t); err != nil {
return errors.WithMessage(err, "writing stream")
}
fLen := len(value.GetTagFamilies())
if fLen < 1 {
return errors.Wrap(errMalformedElement, "no tag family")
}
if fLen > len(s.schema.GetTagFamilies()) {
return errors.Wrap(errMalformedElement, "tag family number is more than expected")
}
shard, err := s.databaseSupplier.SupplyTSDB().CreateShardsAndGetByID(shardID)
if err != nil {
return err
}
seriesDB := shard.Series()
series, err := seriesDB.Get(entity, entityValues)
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
wp, err := series.Create(ctx, t)
if err != nil {
if wp != nil {
_ = wp.Close()
}
return err
}
writeFn := func() error {
builder := wp.WriterBuilder().Time(t)
for fi, family := range value.GetTagFamilies() {
spec := s.schema.GetTagFamilies()[fi]
bb, errMarshal := pbv1.EncodeFamily(spec, family)
if errMarshal != nil {
return errMarshal
}
builder.Family(familyIdentity(spec.GetName(), pbv1.TagFlag), bb)
}
if len(value.GetFields()) > len(s.schema.GetFields()) {
return errors.Wrap(errMalformedElement, "fields number is more than expected")
}
for fi, fieldValue := range value.GetFields() {
fieldSpec := s.schema.GetFields()[fi]
fType, isNull := pbv1.FieldValueTypeConv(fieldValue)
if isNull {
s.l.Warn().RawJSON("written", logger.Proto(value)).Msg("ignore null field")
continue
}
if fType != fieldSpec.GetFieldType() {
return errors.Wrapf(errMalformedElement, "field %s type is unexpected", fieldSpec.GetName())
}
data := encodeFieldValue(fieldValue)
if data == nil {
s.l.Warn().RawJSON("written", logger.Proto(value)).Msg("ignore unknown field")
continue
}
builder.Family(familyIdentity(s.schema.GetFields()[fi].GetName(), pbv1.EncoderFieldFlag(fieldSpec, s.interval)), data)
}
writer, errWrite := builder.Build()
if errWrite != nil {
return errWrite
}
_, errWrite = writer.Write()
if s.l.Debug().Enabled() {
s.l.Debug().Time("ts", t).
Int("ts_nano", t.Nanosecond()).
RawJSON("data", logger.Proto(value)).
Uint64("series_id", uint64(series.ID())).
Stringer("series", series).
Uint64("item_id", uint64(writer.ItemID().ID)).
Int("shard_id", int(shardID)).
Msg("write measure")
}
return errWrite
}
if err = writeFn(); err != nil {
_ = wp.Close()
return err
}
m := index.Message{
IndexWriter: tsdb.NewSeriesIndexWriter(series.ID(), seriesDB),
Value: index.Value{
TagFamilies: value.GetTagFamilies(),
Timestamp: t,
},
BlockCloser: wp,
}
s.indexWriter.Write(m)
if s.processorManager != nil {
s.processorManager.onMeasureWrite(&measurev1.InternalWriteRequest{
Request: &measurev1.WriteRequest{
Metadata: s.GetMetadata(),
DataPoint: value,
},
EntityValues: entityValues[1:],
})
}
return err
}