in src/google/appengine/ext/ndb/query.py [0:0]
def run_to_queue(self, queue, conn, options=None):
"""Run this query, putting entities into the given queue."""
if options is None:
offset = None
limit = None
keys_only = None
else:
offset = options.offset
limit = options.limit
keys_only = options.keys_only
if (options.start_cursor or options.end_cursor or
options.produce_cursors):
names = set()
if self.__orders is not None:
names = self.__orders._get_prop_names()
if '__key__' not in names:
raise datastore_errors.BadArgumentError(
'_MultiQuery with cursors requires __key__ order')
modifiers = {}
if offset:
modifiers['offset'] = None
if limit is not None:
modifiers['limit'] = min(_MAX_LIMIT, offset + limit)
if keys_only and self.__orders is not None:
modifiers['keys_only'] = None
if modifiers:
options = QueryOptions(config=options, **modifiers)
if offset is None:
offset = 0
if limit is None:
limit = _MAX_LIMIT
if self.__orders is None:
keys_seen = set()
for subq in self.__subqueries:
if limit <= 0:
break
subit = tasklets.SerialQueueFuture('_MultiQuery.run_to_queue[ser]')
subq.run_to_queue(subit, conn, options=options)
while limit > 0:
try:
batch, index, result = yield subit.getq()
except EOFError:
break
if keys_only:
key = result
else:
key = result._key
if key not in keys_seen:
keys_seen.add(key)
if offset > 0:
offset -= 1
else:
limit -= 1
queue.putq((None, None, result))
queue.complete()
return
with conn.adapter:
todo = []
for subq in self.__subqueries:
dsquery = subq._get_query(conn)
subit = tasklets.SerialQueueFuture('_MultiQuery.run_to_queue[par]')
subq.run_to_queue(subit, conn, options=options, dsquery=dsquery)
todo.append((subit, dsquery))
state = []
for subit, dsquery in todo:
try:
thing = yield subit.getq()
except EOFError:
continue
else:
state.append(_SubQueryIteratorState(thing, subit, dsquery,
self.__orders))
heapq.heapify(state)
keys_seen = set()
while state and limit > 0:
item = heapq.heappop(state)
batch = item.batch
index = item.index
entity = item.entity
key = entity._key
if key not in keys_seen:
keys_seen.add(key)
if offset > 0:
offset -= 1
else:
limit -= 1
if keys_only:
queue.putq((batch, index, key))
else:
queue.putq((batch, index, entity))
subit = item.iterator
try:
batch, index, entity = yield subit.getq()
except EOFError:
pass
else:
item.batch = batch
item.index = index
item.entity = entity
heapq.heappush(state, item)
queue.complete()