func()

in banyand/measure/write.go [74:187]


func (w *writeCallback) handle(dst map[string]*dataPointsInGroup, writeEvent *measurev1.InternalWriteRequest) (map[string]*dataPointsInGroup, error) {
	req := writeEvent.Request
	t := req.DataPoint.Timestamp.AsTime().Local()
	if err := timestamp.Check(t); err != nil {
		return nil, fmt.Errorf("invalid timestamp: %w", err)
	}
	ts := t.UnixNano()

	gn := req.Metadata.Group
	tsdb, err := w.schemaRepo.loadTSDB(gn)
	if err != nil {
		return nil, fmt.Errorf("cannot load tsdb for group %s: %w", gn, err)
	}
	dpg, ok := dst[gn]
	if !ok {
		dpg = &dataPointsInGroup{
			tsdb:            tsdb,
			tables:          make([]*dataPointsInTable, 0),
			segments:        make([]storage.Segment[*tsTable, option], 0),
			metadataDocMap:  make(map[uint64]int),
			indexModeDocMap: make(map[uint64]int),
		}
		dst[gn] = dpg
	}
	if dpg.latestTS < ts {
		dpg.latestTS = ts
	}

	var dpt *dataPointsInTable
	for i := range dpg.tables {
		if dpg.tables[i].timeRange.Contains(ts) {
			dpt = dpg.tables[i]
			break
		}
	}
	stm, ok := w.schemaRepo.loadMeasure(req.GetMetadata())
	if !ok {
		return nil, fmt.Errorf("cannot find measure definition: %s", req.GetMetadata())
	}
	fLen := len(req.DataPoint.GetTagFamilies())
	if fLen < 1 {
		return nil, fmt.Errorf("%s has no tag family", req.Metadata)
	}
	if fLen > len(stm.schema.GetTagFamilies()) {
		return nil, fmt.Errorf("%s has more tag families than %s", req.Metadata, stm.schema)
	}
	is := stm.indexSchema.Load().(indexSchema)
	if len(is.indexRuleLocators.TagFamilyTRule) != len(stm.GetSchema().GetTagFamilies()) {
		logger.Panicf("metadata crashed, tag family rule length %d, tag family length %d",
			len(is.indexRuleLocators.TagFamilyTRule), len(stm.GetSchema().GetTagFamilies()))
	}

	shardID := common.ShardID(writeEvent.ShardId)
	if dpt == nil {
		if dpt, err = w.newDpt(tsdb, dpg, t, ts, shardID, stm.schema.IndexMode); err != nil {
			return nil, fmt.Errorf("cannot create data points in table: %w", err)
		}
	}

	series := &pbv1.Series{
		Subject:      req.Metadata.Name,
		EntityValues: writeEvent.EntityValues,
	}
	if err := series.Marshal(); err != nil {
		return nil, fmt.Errorf("cannot marshal series: %w", err)
	}

	if stm.schema.IndexMode {
		fields := handleIndexMode(stm.schema, req, is.indexRuleLocators)
		fields = w.appendEntityTagsToIndexFields(fields, stm, series)
		doc := index.Document{
			DocID:        uint64(series.ID),
			EntityValues: series.Buffer,
			Fields:       fields,
			Version:      req.DataPoint.Version,
			Timestamp:    ts,
		}

		if pos, exists := dpg.indexModeDocMap[doc.DocID]; exists {
			dpg.indexModeDocs[pos] = doc
		} else {
			dpg.indexModeDocMap[doc.DocID] = len(dpg.indexModeDocs)
			dpg.indexModeDocs = append(dpg.indexModeDocs, doc)
		}
		return dst, nil
	}

	fields := appendDataPoints(dpt, ts, series.ID, stm.GetSchema(), req, is.indexRuleLocators)

	doc := index.Document{
		DocID:        uint64(series.ID),
		EntityValues: series.Buffer,
		Fields:       fields,
	}

	if pos, exists := dpg.metadataDocMap[doc.DocID]; exists {
		dpg.metadataDocs[pos] = doc
	} else {
		dpg.metadataDocMap[doc.DocID] = len(dpg.metadataDocs)
		dpg.metadataDocs = append(dpg.metadataDocs, doc)
	}

	if p, _ := w.schemaRepo.topNProcessorMap.Load(getKey(stm.schema.GetMetadata())); p != nil {
		p.(*topNProcessorManager).onMeasureWrite(uint64(series.ID), uint32(shardID), &measurev1.InternalWriteRequest{
			Request: &measurev1.WriteRequest{
				Metadata:  stm.GetSchema().Metadata,
				DataPoint: req.DataPoint,
				MessageId: uint64(time.Now().UnixNano()),
			},
			EntityValues: writeEvent.EntityValues,
		}, stm)
	}
	return dst, nil
}