banyand/measure/measure_write.go (165 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package measure import ( "bytes" "context" "time" "github.com/pkg/errors" "github.com/apache/skywalking-banyandb/api/common" measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/banyand/tsdb" "github.com/apache/skywalking-banyandb/banyand/tsdb/index" "github.com/apache/skywalking-banyandb/pkg/bus" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/logger" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" "github.com/apache/skywalking-banyandb/pkg/timestamp" ) var errMalformedElement = errors.New("element is malformed") func (s *measure) write(shardID common.ShardID, entity []byte, entityValues tsdb.EntityValues, value *measurev1.DataPointValue, ) error { t := value.GetTimestamp().AsTime().Local() if err := timestamp.Check(t); err != nil { return errors.WithMessage(err, "writing stream") } fLen := len(value.GetTagFamilies()) if fLen < 1 { return errors.Wrap(errMalformedElement, "no tag family") } if fLen > len(s.schema.GetTagFamilies()) { return errors.Wrap(errMalformedElement, "tag family number is more than expected") } shard, err := s.databaseSupplier.SupplyTSDB().CreateShardsAndGetByID(shardID) if err != nil { return err } seriesDB := shard.Series() series, err := seriesDB.Get(entity, entityValues) if err != nil { return err } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() wp, err := series.Create(ctx, t) if err != nil { if wp != nil { _ = wp.Close() } return err } writeFn := func() error { builder := wp.WriterBuilder().Time(t) for fi, family := range value.GetTagFamilies() { spec := s.schema.GetTagFamilies()[fi] bb, errMarshal := pbv1.EncodeFamily(spec, family) if errMarshal != nil { return errMarshal } builder.Family(familyIdentity(spec.GetName(), pbv1.TagFlag), bb) } if len(value.GetFields()) > len(s.schema.GetFields()) { return errors.Wrap(errMalformedElement, "fields number is more than expected") } for fi, fieldValue := range value.GetFields() { fieldSpec := s.schema.GetFields()[fi] fType, isNull := pbv1.FieldValueTypeConv(fieldValue) if isNull { s.l.Warn().RawJSON("written", logger.Proto(value)).Msg("ignore null field") continue } if fType != fieldSpec.GetFieldType() { return errors.Wrapf(errMalformedElement, "field %s type is unexpected", fieldSpec.GetName()) } data := encodeFieldValue(fieldValue) if data == nil { s.l.Warn().RawJSON("written", logger.Proto(value)).Msg("ignore unknown field") continue } builder.Family(familyIdentity(s.schema.GetFields()[fi].GetName(), pbv1.EncoderFieldFlag(fieldSpec, s.interval)), data) } writer, errWrite := builder.Build() if errWrite != nil { return errWrite } _, errWrite = writer.Write() if s.l.Debug().Enabled() { s.l.Debug().Time("ts", t). Int("ts_nano", t.Nanosecond()). RawJSON("data", logger.Proto(value)). Uint64("series_id", uint64(series.ID())). Stringer("series", series). Uint64("item_id", uint64(writer.ItemID().ID)). Int("shard_id", int(shardID)). Msg("write measure") } return errWrite } if err = writeFn(); err != nil { _ = wp.Close() return err } m := index.Message{ IndexWriter: tsdb.NewSeriesIndexWriter(series.ID(), seriesDB), Value: index.Value{ TagFamilies: value.GetTagFamilies(), Timestamp: t, }, BlockCloser: wp, } s.indexWriter.Write(m) if s.processorManager != nil { s.processorManager.onMeasureWrite(&measurev1.InternalWriteRequest{ Request: &measurev1.WriteRequest{ Metadata: s.GetMetadata(), DataPoint: value, }, EntityValues: entityValues[1:], }) } return err } type writeCallback struct { l *logger.Logger schemaRepo *schemaRepo } func setUpWriteCallback(l *logger.Logger, schemaRepo *schemaRepo) bus.MessageListener { return &writeCallback{ l: l, schemaRepo: schemaRepo, } } func (w *writeCallback) Rev(message bus.Message) (resp bus.Message) { writeEvent, ok := message.Data().(*measurev1.InternalWriteRequest) if !ok { w.l.Warn().Msg("invalid event data type") return } stm, ok := w.schemaRepo.loadMeasure(writeEvent.GetRequest().GetMetadata()) if !ok { w.l.Warn().Msg("cannot find measure definition") return } err := stm.write(common.ShardID(writeEvent.GetShardId()), writeEvent.SeriesHash, tsdb.DecodeEntityValues(writeEvent.GetEntityValues()), writeEvent.GetRequest().GetDataPoint()) if err != nil { w.l.Error().Err(err).RawJSON("written", logger.Proto(writeEvent)).Msg("fail to write entity") } return } func familyIdentity(name string, flag []byte) []byte { return bytes.Join([][]byte{tsdb.Hash([]byte(name)), flag}, nil) } func encodeFieldValue(fieldValue *modelv1.FieldValue) []byte { switch fieldValue.GetValue().(type) { case *modelv1.FieldValue_Int: return convert.Int64ToBytes(fieldValue.GetInt().GetValue()) case *modelv1.FieldValue_Float: return convert.Float64ToBytes(fieldValue.GetFloat().GetValue()) case *modelv1.FieldValue_Str: return []byte(fieldValue.GetStr().GetValue()) case *modelv1.FieldValue_BinaryData: return bytes.Clone(fieldValue.GetBinaryData()) } return nil }