func()

in pkg/events/events.go [210:242]


func (ev *events) parseBuffer(consumerPosition uint64, eventRing *RingBuffer) {
	var readDone bool
	for {
		readDone = true
		producerPosition := eventRing.getProducerPosition()
		for consumerPosition < producerPosition {

			// Get the header - Data points to the DataPage which will be offset by consumerPosition
			ringdata := eventRing.ParseRingData(consumerPosition)

			// Check if busy then skip, Might not be committed yet
			// There are 2 steps -> reserve and then commit/discard
			if ringdata.BusyRecord {
				readDone = true
				break
			}

			readDone = false

			// Update the position to the next record irrespective of discard or commit of data
			consumerPosition += uint64(ringdata.RecordLen)

			//Pick the data only if committed
			if !ringdata.DiscardRecord {
				ev.eventsDataChannel <- ringdata.parseSample()
			}
			eventRing.setConsumerPosition(consumerPosition)
		}
		if readDone {
			break
		}
	}
}