func()

in plugin/connector/standalone/broker.go [97:128]


func (q *MessageQueue) GetByOffset(offset int64) (*Message, error) {
	q.mutex.Lock()
	defer q.mutex.Unlock()

	if offset < 0 {
		return nil, fmt.Errorf("invalid offset param: %d", offset)
	}

	for {
		// Wait util queue is not empty
		if len(q.items) == 0 {
			q.newMsg.Wait()
			continue
		}

		lastMessage := q.items[len(q.items)-1]
		// Wait util new message has been put
		if lastMessage.GetOffset() < offset {
			q.newMsg.Wait()
			continue
		}

		firstMessage := q.items[0]
		// message has been deleted
		if firstMessage.GetOffset() > offset {
			return nil, fmt.Errorf("offset has been deleted, offset : %d", offset)
		}

		index := int(offset - firstMessage.GetOffset())
		return q.items[index], nil
	}
}