func()

in banyand/stream/write.go [71:229]


func (w *writeCallback) handle(dst map[string]*elementsInGroup, writeEvent *streamv1.InternalWriteRequest,
	docIDBuilder *strings.Builder,
) (map[string]*elementsInGroup, error) {
	req := writeEvent.Request
	t := req.Element.Timestamp.AsTime().Local()
	if err := timestamp.Check(t); err != nil {
		return nil, fmt.Errorf("invalid timestamp: %w", err)
	}
	ts := t.UnixNano()

	gn := req.Metadata.Group
	tsdb, err := w.schemaRepo.loadTSDB(gn)
	if err != nil {
		return nil, fmt.Errorf("cannot load tsdb for group %s: %w", gn, err)
	}
	eg, ok := dst[gn]
	if !ok {
		eg = &elementsInGroup{
			tsdb:        tsdb,
			tables:      make([]*elementsInTable, 0),
			segments:    make([]storage.Segment[*tsTable, option], 0),
			docIDsAdded: make(map[uint64]struct{}), // Initialize the map
		}
		dst[gn] = eg
	}
	if eg.latestTS < ts {
		eg.latestTS = ts
	}

	var et *elementsInTable
	for i := range eg.tables {
		if eg.tables[i].timeRange.Contains(ts) {
			et = eg.tables[i]
			break
		}
	}
	shardID := common.ShardID(writeEvent.ShardId)
	if et == nil {
		var segment storage.Segment[*tsTable, option]
		for _, seg := range eg.segments {
			if seg.GetTimeRange().Contains(ts) {
				segment = seg
			}
		}
		if segment == nil {
			segment, err = tsdb.CreateSegmentIfNotExist(t)
			if err != nil {
				return nil, fmt.Errorf("cannot create segment: %w", err)
			}
			eg.segments = append(eg.segments, segment)
		}
		tstb, err := segment.CreateTSTableIfNotExist(shardID)
		if err != nil {
			return nil, fmt.Errorf("cannot create ts table: %w", err)
		}
		et = &elementsInTable{
			timeRange: segment.GetTimeRange(),
			tsTable:   tstb,
			elements:  generateElements(),
		}
		et.elements.reset()
		eg.tables = append(eg.tables, et)
	}
	et.elements.timestamps = append(et.elements.timestamps, ts)
	docIDBuilder.Reset()
	docIDBuilder.WriteString(req.Metadata.Name)
	docIDBuilder.WriteByte('|')
	docIDBuilder.WriteString(req.Element.ElementId)
	eID := convert.HashStr(docIDBuilder.String())
	et.elements.elementIDs = append(et.elements.elementIDs, eID)
	stm, ok := w.schemaRepo.loadStream(writeEvent.GetRequest().GetMetadata())
	if !ok {
		return nil, fmt.Errorf("cannot find stream definition: %s", writeEvent.GetRequest().GetMetadata())
	}
	fLen := len(req.Element.GetTagFamilies())
	if fLen < 1 {
		return nil, fmt.Errorf("%s has no tag family", req)
	}
	if fLen > len(stm.schema.GetTagFamilies()) {
		return nil, fmt.Errorf("%s has more tag families than %s", req.Metadata, stm.schema)
	}
	series := &pbv1.Series{
		Subject:      req.Metadata.Name,
		EntityValues: writeEvent.EntityValues,
	}
	if err := series.Marshal(); err != nil {
		return nil, fmt.Errorf("cannot marshal series: %w", err)
	}
	et.elements.seriesIDs = append(et.elements.seriesIDs, series.ID)

	is := stm.indexSchema.Load().(indexSchema)

	tagFamilies := make([]tagValues, 0, len(stm.schema.TagFamilies))
	if len(is.indexRuleLocators.TagFamilyTRule) != len(stm.GetSchema().GetTagFamilies()) {
		logger.Panicf("metadata crashed, tag family rule length %d, tag family length %d",
			len(is.indexRuleLocators.TagFamilyTRule), len(stm.GetSchema().GetTagFamilies()))
	}
	var fields []index.Field
	for i := range stm.GetSchema().GetTagFamilies() {
		var tagFamily *modelv1.TagFamilyForWrite
		if len(req.Element.TagFamilies) <= i {
			tagFamily = pbv1.NullTagFamily
		} else {
			tagFamily = req.Element.TagFamilies[i]
		}
		tfr := is.indexRuleLocators.TagFamilyTRule[i]
		tagFamilySpec := stm.GetSchema().GetTagFamilies()[i]
		tf := tagValues{
			tag: tagFamilySpec.Name,
		}

		for j := range tagFamilySpec.Tags {
			var tagValue *modelv1.TagValue
			if tagFamily == pbv1.NullTagFamily || len(tagFamily.Tags) <= j {
				tagValue = pbv1.NullTagValue
			} else {
				tagValue = tagFamily.Tags[j]
			}

			t := tagFamilySpec.Tags[j]
			if r, ok := tfr[t.Name]; ok && tagValue != pbv1.NullTagValue {
				fields = appendField(fields, index.FieldKey{
					IndexRuleID: r.GetMetadata().GetId(),
					Analyzer:    r.Analyzer,
					SeriesID:    series.ID,
				}, t.Type, tagValue, r.GetNoSort())
			}
			_, isEntity := is.indexRuleLocators.EntitySet[t.Name]
			if tagFamilySpec.Tags[j].IndexedOnly || isEntity {
				continue
			}
			tf.values = append(tf.values, encodeTagValue(
				t.Name,
				t.Type,
				tagValue))
		}
		if len(tf.values) > 0 {
			tagFamilies = append(tagFamilies, tf)
		}
	}
	et.elements.tagFamilies = append(et.elements.tagFamilies, tagFamilies)

	et.docs = append(et.docs, index.Document{
		DocID:     eID,
		Fields:    fields,
		Timestamp: ts,
	})

	docID := uint64(series.ID)
	if _, exists := eg.docIDsAdded[docID]; !exists {
		eg.docs = append(eg.docs, index.Document{
			DocID:        docID,
			EntityValues: series.Buffer,
		})
		eg.docIDsAdded[docID] = struct{}{}
	}

	return dst, nil
}