func()

in banyand/measure/write.go [412:472]


func (w *writeCallback) Rev(_ context.Context, message bus.Message) (resp bus.Message) {
	events, ok := message.Data().([]any)
	if !ok {
		w.l.Warn().Msg("invalid event data type")
		return
	}
	if len(events) < 1 {
		w.l.Warn().Msg("empty event")
		return
	}
	groups := make(map[string]*dataPointsInGroup)
	for i := range events {
		var writeEvent *measurev1.InternalWriteRequest
		switch e := events[i].(type) {
		case *measurev1.InternalWriteRequest:
			writeEvent = e
		case *anypb.Any:
			writeEvent = &measurev1.InternalWriteRequest{}
			if err := e.UnmarshalTo(writeEvent); err != nil {
				w.l.Error().Err(err).RawJSON("written", logger.Proto(e)).Msg("fail to unmarshal event")
				continue
			}
		default:
			w.l.Warn().Msg("invalid event data type")
			continue
		}
		var err error
		if groups, err = w.handle(groups, writeEvent); err != nil {
			w.l.Error().Err(err).RawJSON("written", logger.Proto(writeEvent)).Msg("cannot handle write event")
			groups = make(map[string]*dataPointsInGroup)
			continue
		}
	}
	for i := range groups {
		g := groups[i]
		for j := range g.tables {
			dps := g.tables[j]
			if dps.tsTable != nil {
				dps.tsTable.mustAddDataPoints(dps.dataPoints)
			}
			if dps.dataPoints != nil {
				releaseDataPoints(dps.dataPoints)
			}
		}
		for _, segment := range g.segments {
			if len(g.metadataDocs) > 0 {
				if err := segment.IndexDB().Insert(g.metadataDocs); err != nil {
					w.l.Error().Err(err).Msg("cannot write metadata")
				}
			}
			if len(g.indexModeDocs) > 0 {
				if err := segment.IndexDB().Update(g.indexModeDocs); err != nil {
					w.l.Error().Err(err).Msg("cannot write index")
				}
			}
			segment.DecRef()
		}
		g.tsdb.Tick(g.latestTS)
	}
	return
}