func()

in banyand/measure/measure_write.go [41:144]


func (s *measure) write(shardID common.ShardID, entity []byte, entityValues tsdb.EntityValues,
	value *measurev1.DataPointValue,
) error {
	t := value.GetTimestamp().AsTime().Local()
	if err := timestamp.Check(t); err != nil {
		return errors.WithMessage(err, "writing stream")
	}
	fLen := len(value.GetTagFamilies())
	if fLen < 1 {
		return errors.Wrap(errMalformedElement, "no tag family")
	}
	if fLen > len(s.schema.GetTagFamilies()) {
		return errors.Wrap(errMalformedElement, "tag family number is more than expected")
	}
	shard, err := s.databaseSupplier.SupplyTSDB().CreateShardsAndGetByID(shardID)
	if err != nil {
		return err
	}
	seriesDB := shard.Series()
	series, err := seriesDB.Get(entity, entityValues)
	if err != nil {
		return err
	}
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()
	wp, err := series.Create(ctx, t)
	if err != nil {
		if wp != nil {
			_ = wp.Close()
		}
		return err
	}
	writeFn := func() error {
		builder := wp.WriterBuilder().Time(t)
		for fi, family := range value.GetTagFamilies() {
			spec := s.schema.GetTagFamilies()[fi]
			bb, errMarshal := pbv1.EncodeFamily(spec, family)
			if errMarshal != nil {
				return errMarshal
			}
			builder.Family(familyIdentity(spec.GetName(), pbv1.TagFlag), bb)
		}
		if len(value.GetFields()) > len(s.schema.GetFields()) {
			return errors.Wrap(errMalformedElement, "fields number is more than expected")
		}
		for fi, fieldValue := range value.GetFields() {
			fieldSpec := s.schema.GetFields()[fi]
			fType, isNull := pbv1.FieldValueTypeConv(fieldValue)
			if isNull {
				s.l.Warn().RawJSON("written", logger.Proto(value)).Msg("ignore null field")
				continue
			}
			if fType != fieldSpec.GetFieldType() {
				return errors.Wrapf(errMalformedElement, "field %s type is unexpected", fieldSpec.GetName())
			}
			data := encodeFieldValue(fieldValue)
			if data == nil {
				s.l.Warn().RawJSON("written", logger.Proto(value)).Msg("ignore unknown field")
				continue
			}
			builder.Family(familyIdentity(s.schema.GetFields()[fi].GetName(), pbv1.EncoderFieldFlag(fieldSpec, s.interval)), data)
		}
		writer, errWrite := builder.Build()
		if errWrite != nil {
			return errWrite
		}
		_, errWrite = writer.Write()
		if s.l.Debug().Enabled() {
			s.l.Debug().Time("ts", t).
				Int("ts_nano", t.Nanosecond()).
				RawJSON("data", logger.Proto(value)).
				Uint64("series_id", uint64(series.ID())).
				Stringer("series", series).
				Uint64("item_id", uint64(writer.ItemID().ID)).
				Int("shard_id", int(shardID)).
				Msg("write measure")
		}
		return errWrite
	}

	if err = writeFn(); err != nil {
		_ = wp.Close()
		return err
	}
	m := index.Message{
		IndexWriter: tsdb.NewSeriesIndexWriter(series.ID(), seriesDB),
		Value: index.Value{
			TagFamilies: value.GetTagFamilies(),
			Timestamp:   t,
		},
		BlockCloser: wp,
	}
	s.indexWriter.Write(m)
	if s.processorManager != nil {
		s.processorManager.onMeasureWrite(&measurev1.InternalWriteRequest{
			Request: &measurev1.WriteRequest{
				Metadata:  s.GetMetadata(),
				DataPoint: value,
			},
			EntityValues: entityValues[1:],
		})
	}
	return err
}