in azure-kusto-ingest/azure/kusto/ingest/_status_q.py [0:0]
def peek(self, n=1, raw=False) -> List["StatusMessage"]:
"""Peek status queue
:param int n: number of messages to return as part of peek.
:param bool raw: should message content be returned as is (no parsing).
"""
def _peek_specific_q(_q: QueueClient, _n: int) -> bool:
has_messages = False
for m in _q.peek_messages(max_messages=_n):
if m:
has_messages = True
result.append(m if raw else self._deserialize_message(m))
# short circuit to prevent unneeded work
if len(result) == n:
return True
return has_messages
queues = self._get_queues()
random.shuffle(queues)
per_q = int(n / len(queues)) + 1
result = []
non_empty_qs = []
for q in queues:
if _peek_specific_q(q, per_q):
non_empty_qs.append(q)
if len(result) == n:
return result
# in-case queues aren't balanced, and we didn't get enough messages, iterate again and this time get all that we can
for q in non_empty_qs:
_peek_specific_q(q, n)
if len(result) == n:
return result
# because we ask for n / len(qs) + 1, we might get more message then requests
return result