in banyand/measure/write.go [412:472]
func (w *writeCallback) Rev(_ context.Context, message bus.Message) (resp bus.Message) {
events, ok := message.Data().([]any)
if !ok {
w.l.Warn().Msg("invalid event data type")
return
}
if len(events) < 1 {
w.l.Warn().Msg("empty event")
return
}
groups := make(map[string]*dataPointsInGroup)
for i := range events {
var writeEvent *measurev1.InternalWriteRequest
switch e := events[i].(type) {
case *measurev1.InternalWriteRequest:
writeEvent = e
case *anypb.Any:
writeEvent = &measurev1.InternalWriteRequest{}
if err := e.UnmarshalTo(writeEvent); err != nil {
w.l.Error().Err(err).RawJSON("written", logger.Proto(e)).Msg("fail to unmarshal event")
continue
}
default:
w.l.Warn().Msg("invalid event data type")
continue
}
var err error
if groups, err = w.handle(groups, writeEvent); err != nil {
w.l.Error().Err(err).RawJSON("written", logger.Proto(writeEvent)).Msg("cannot handle write event")
groups = make(map[string]*dataPointsInGroup)
continue
}
}
for i := range groups {
g := groups[i]
for j := range g.tables {
dps := g.tables[j]
if dps.tsTable != nil {
dps.tsTable.mustAddDataPoints(dps.dataPoints)
}
if dps.dataPoints != nil {
releaseDataPoints(dps.dataPoints)
}
}
for _, segment := range g.segments {
if len(g.metadataDocs) > 0 {
if err := segment.IndexDB().Insert(g.metadataDocs); err != nil {
w.l.Error().Err(err).Msg("cannot write metadata")
}
}
if len(g.indexModeDocs) > 0 {
if err := segment.IndexDB().Update(g.indexModeDocs); err != nil {
w.l.Error().Err(err).Msg("cannot write index")
}
}
segment.DecRef()
}
g.tsdb.Tick(g.latestTS)
}
return
}