in banyand/measure/write.go [74:187]
func (w *writeCallback) handle(dst map[string]*dataPointsInGroup, writeEvent *measurev1.InternalWriteRequest) (map[string]*dataPointsInGroup, error) {
req := writeEvent.Request
t := req.DataPoint.Timestamp.AsTime().Local()
if err := timestamp.Check(t); err != nil {
return nil, fmt.Errorf("invalid timestamp: %w", err)
}
ts := t.UnixNano()
gn := req.Metadata.Group
tsdb, err := w.schemaRepo.loadTSDB(gn)
if err != nil {
return nil, fmt.Errorf("cannot load tsdb for group %s: %w", gn, err)
}
dpg, ok := dst[gn]
if !ok {
dpg = &dataPointsInGroup{
tsdb: tsdb,
tables: make([]*dataPointsInTable, 0),
segments: make([]storage.Segment[*tsTable, option], 0),
metadataDocMap: make(map[uint64]int),
indexModeDocMap: make(map[uint64]int),
}
dst[gn] = dpg
}
if dpg.latestTS < ts {
dpg.latestTS = ts
}
var dpt *dataPointsInTable
for i := range dpg.tables {
if dpg.tables[i].timeRange.Contains(ts) {
dpt = dpg.tables[i]
break
}
}
stm, ok := w.schemaRepo.loadMeasure(req.GetMetadata())
if !ok {
return nil, fmt.Errorf("cannot find measure definition: %s", req.GetMetadata())
}
fLen := len(req.DataPoint.GetTagFamilies())
if fLen < 1 {
return nil, fmt.Errorf("%s has no tag family", req.Metadata)
}
if fLen > len(stm.schema.GetTagFamilies()) {
return nil, fmt.Errorf("%s has more tag families than %s", req.Metadata, stm.schema)
}
is := stm.indexSchema.Load().(indexSchema)
if len(is.indexRuleLocators.TagFamilyTRule) != len(stm.GetSchema().GetTagFamilies()) {
logger.Panicf("metadata crashed, tag family rule length %d, tag family length %d",
len(is.indexRuleLocators.TagFamilyTRule), len(stm.GetSchema().GetTagFamilies()))
}
shardID := common.ShardID(writeEvent.ShardId)
if dpt == nil {
if dpt, err = w.newDpt(tsdb, dpg, t, ts, shardID, stm.schema.IndexMode); err != nil {
return nil, fmt.Errorf("cannot create data points in table: %w", err)
}
}
series := &pbv1.Series{
Subject: req.Metadata.Name,
EntityValues: writeEvent.EntityValues,
}
if err := series.Marshal(); err != nil {
return nil, fmt.Errorf("cannot marshal series: %w", err)
}
if stm.schema.IndexMode {
fields := handleIndexMode(stm.schema, req, is.indexRuleLocators)
fields = w.appendEntityTagsToIndexFields(fields, stm, series)
doc := index.Document{
DocID: uint64(series.ID),
EntityValues: series.Buffer,
Fields: fields,
Version: req.DataPoint.Version,
Timestamp: ts,
}
if pos, exists := dpg.indexModeDocMap[doc.DocID]; exists {
dpg.indexModeDocs[pos] = doc
} else {
dpg.indexModeDocMap[doc.DocID] = len(dpg.indexModeDocs)
dpg.indexModeDocs = append(dpg.indexModeDocs, doc)
}
return dst, nil
}
fields := appendDataPoints(dpt, ts, series.ID, stm.GetSchema(), req, is.indexRuleLocators)
doc := index.Document{
DocID: uint64(series.ID),
EntityValues: series.Buffer,
Fields: fields,
}
if pos, exists := dpg.metadataDocMap[doc.DocID]; exists {
dpg.metadataDocs[pos] = doc
} else {
dpg.metadataDocMap[doc.DocID] = len(dpg.metadataDocs)
dpg.metadataDocs = append(dpg.metadataDocs, doc)
}
if p, _ := w.schemaRepo.topNProcessorMap.Load(getKey(stm.schema.GetMetadata())); p != nil {
p.(*topNProcessorManager).onMeasureWrite(uint64(series.ID), uint32(shardID), &measurev1.InternalWriteRequest{
Request: &measurev1.WriteRequest{
Metadata: stm.GetSchema().Metadata,
DataPoint: req.DataPoint,
MessageId: uint64(time.Now().UnixNano()),
},
EntityValues: writeEvent.EntityValues,
}, stm)
}
return dst, nil
}