in pkg/events/events.go [210:242]
func (ev *events) parseBuffer(consumerPosition uint64, eventRing *RingBuffer) {
var readDone bool
for {
readDone = true
producerPosition := eventRing.getProducerPosition()
for consumerPosition < producerPosition {
// Get the header - Data points to the DataPage which will be offset by consumerPosition
ringdata := eventRing.ParseRingData(consumerPosition)
// Check if busy then skip, Might not be committed yet
// There are 2 steps -> reserve and then commit/discard
if ringdata.BusyRecord {
readDone = true
break
}
readDone = false
// Update the position to the next record irrespective of discard or commit of data
consumerPosition += uint64(ringdata.RecordLen)
//Pick the data only if committed
if !ringdata.DiscardRecord {
ev.eventsDataChannel <- ringdata.parseSample()
}
eventRing.setConsumerPosition(consumerPosition)
}
if readDone {
break
}
}
}