in banyand/stream/write.go [71:229]
func (w *writeCallback) handle(dst map[string]*elementsInGroup, writeEvent *streamv1.InternalWriteRequest,
docIDBuilder *strings.Builder,
) (map[string]*elementsInGroup, error) {
req := writeEvent.Request
t := req.Element.Timestamp.AsTime().Local()
if err := timestamp.Check(t); err != nil {
return nil, fmt.Errorf("invalid timestamp: %w", err)
}
ts := t.UnixNano()
gn := req.Metadata.Group
tsdb, err := w.schemaRepo.loadTSDB(gn)
if err != nil {
return nil, fmt.Errorf("cannot load tsdb for group %s: %w", gn, err)
}
eg, ok := dst[gn]
if !ok {
eg = &elementsInGroup{
tsdb: tsdb,
tables: make([]*elementsInTable, 0),
segments: make([]storage.Segment[*tsTable, option], 0),
docIDsAdded: make(map[uint64]struct{}), // Initialize the map
}
dst[gn] = eg
}
if eg.latestTS < ts {
eg.latestTS = ts
}
var et *elementsInTable
for i := range eg.tables {
if eg.tables[i].timeRange.Contains(ts) {
et = eg.tables[i]
break
}
}
shardID := common.ShardID(writeEvent.ShardId)
if et == nil {
var segment storage.Segment[*tsTable, option]
for _, seg := range eg.segments {
if seg.GetTimeRange().Contains(ts) {
segment = seg
}
}
if segment == nil {
segment, err = tsdb.CreateSegmentIfNotExist(t)
if err != nil {
return nil, fmt.Errorf("cannot create segment: %w", err)
}
eg.segments = append(eg.segments, segment)
}
tstb, err := segment.CreateTSTableIfNotExist(shardID)
if err != nil {
return nil, fmt.Errorf("cannot create ts table: %w", err)
}
et = &elementsInTable{
timeRange: segment.GetTimeRange(),
tsTable: tstb,
elements: generateElements(),
}
et.elements.reset()
eg.tables = append(eg.tables, et)
}
et.elements.timestamps = append(et.elements.timestamps, ts)
docIDBuilder.Reset()
docIDBuilder.WriteString(req.Metadata.Name)
docIDBuilder.WriteByte('|')
docIDBuilder.WriteString(req.Element.ElementId)
eID := convert.HashStr(docIDBuilder.String())
et.elements.elementIDs = append(et.elements.elementIDs, eID)
stm, ok := w.schemaRepo.loadStream(writeEvent.GetRequest().GetMetadata())
if !ok {
return nil, fmt.Errorf("cannot find stream definition: %s", writeEvent.GetRequest().GetMetadata())
}
fLen := len(req.Element.GetTagFamilies())
if fLen < 1 {
return nil, fmt.Errorf("%s has no tag family", req)
}
if fLen > len(stm.schema.GetTagFamilies()) {
return nil, fmt.Errorf("%s has more tag families than %s", req.Metadata, stm.schema)
}
series := &pbv1.Series{
Subject: req.Metadata.Name,
EntityValues: writeEvent.EntityValues,
}
if err := series.Marshal(); err != nil {
return nil, fmt.Errorf("cannot marshal series: %w", err)
}
et.elements.seriesIDs = append(et.elements.seriesIDs, series.ID)
is := stm.indexSchema.Load().(indexSchema)
tagFamilies := make([]tagValues, 0, len(stm.schema.TagFamilies))
if len(is.indexRuleLocators.TagFamilyTRule) != len(stm.GetSchema().GetTagFamilies()) {
logger.Panicf("metadata crashed, tag family rule length %d, tag family length %d",
len(is.indexRuleLocators.TagFamilyTRule), len(stm.GetSchema().GetTagFamilies()))
}
var fields []index.Field
for i := range stm.GetSchema().GetTagFamilies() {
var tagFamily *modelv1.TagFamilyForWrite
if len(req.Element.TagFamilies) <= i {
tagFamily = pbv1.NullTagFamily
} else {
tagFamily = req.Element.TagFamilies[i]
}
tfr := is.indexRuleLocators.TagFamilyTRule[i]
tagFamilySpec := stm.GetSchema().GetTagFamilies()[i]
tf := tagValues{
tag: tagFamilySpec.Name,
}
for j := range tagFamilySpec.Tags {
var tagValue *modelv1.TagValue
if tagFamily == pbv1.NullTagFamily || len(tagFamily.Tags) <= j {
tagValue = pbv1.NullTagValue
} else {
tagValue = tagFamily.Tags[j]
}
t := tagFamilySpec.Tags[j]
if r, ok := tfr[t.Name]; ok && tagValue != pbv1.NullTagValue {
fields = appendField(fields, index.FieldKey{
IndexRuleID: r.GetMetadata().GetId(),
Analyzer: r.Analyzer,
SeriesID: series.ID,
}, t.Type, tagValue, r.GetNoSort())
}
_, isEntity := is.indexRuleLocators.EntitySet[t.Name]
if tagFamilySpec.Tags[j].IndexedOnly || isEntity {
continue
}
tf.values = append(tf.values, encodeTagValue(
t.Name,
t.Type,
tagValue))
}
if len(tf.values) > 0 {
tagFamilies = append(tagFamilies, tf)
}
}
et.elements.tagFamilies = append(et.elements.tagFamilies, tagFamilies)
et.docs = append(et.docs, index.Document{
DocID: eID,
Fields: fields,
Timestamp: ts,
})
docID := uint64(series.ID)
if _, exists := eg.docIDsAdded[docID]; !exists {
eg.docs = append(eg.docs, index.Document{
DocID: docID,
EntityValues: series.Buffer,
})
eg.docIDsAdded[docID] = struct{}{}
}
return dst, nil
}