in plugin/connector/standalone/broker.go [97:128]
func (q *MessageQueue) GetByOffset(offset int64) (*Message, error) {
q.mutex.Lock()
defer q.mutex.Unlock()
if offset < 0 {
return nil, fmt.Errorf("invalid offset param: %d", offset)
}
for {
// Wait util queue is not empty
if len(q.items) == 0 {
q.newMsg.Wait()
continue
}
lastMessage := q.items[len(q.items)-1]
// Wait util new message has been put
if lastMessage.GetOffset() < offset {
q.newMsg.Wait()
continue
}
firstMessage := q.items[0]
// message has been deleted
if firstMessage.GetOffset() > offset {
return nil, fmt.Errorf("offset has been deleted, offset : %d", offset)
}
index := int(offset - firstMessage.GetOffset())
return q.items[index], nil
}
}