in src/google/appengine/api/datastore.py [0:0]
def Run(self, **kwargs):
"""Return an iterable output with all results in order.
Merge sort the results. First create a list of iterators, then walk
though them and yield results in order.
Args:
kwargs: Any keyword arguments accepted by datastore_query.QueryOptions().
Returns:
An iterator for the result set.
"""
config = _GetConfigFromKwargs(kwargs, convert_rpc=True,
config_class=datastore_query.QueryOptions)
if config and config.keys_only:
raise datastore_errors.BadRequestError(
'keys only queries are not supported by multi-query.')
lower_bound, upper_bound, config = self._ExtractBounds(config)
projection, override = self.__GetProjectionOverride(config)
if override:
config = datastore_query.QueryOptions(projection=override, config=config)
results = []
count = 1
log_level = logging.DEBUG - 1
for bound_query in self.__bound_queries:
logging.log(log_level, 'Running query #%i' % count)
results.append(bound_query.Run(config=config))
count += 1
def GetDedupeKey(sort_order_entity):
if projection:
return (
sort_order_entity.GetEntity().key(),
frozenset(six.iteritems(sort_order_entity.GetEntity())))
else:
return sort_order_entity.GetEntity().key()
def IterateResults(results):
"""Iterator function to return all results in sorted order.
Iterate over the array of results, yielding the next element, in
sorted order. This function is destructive (results will be empty
when the operation is complete).
Args:
results: list of result iterators to merge and iterate through
Yields:
The next result in sorted order.
"""
result_heap = []
for result in results:
heap_value = MultiQuery.SortOrderEntity(result, self.__orderings)
if heap_value.GetEntity():
heapq.heappush(result_heap, heap_value)
used_keys = set()
while result_heap:
if upper_bound is not None and len(used_keys) >= upper_bound:
break
top_result = heapq.heappop(result_heap)
dedupe_key = GetDedupeKey(top_result)
if dedupe_key not in used_keys:
result = top_result.GetEntity()
if override:
for key in list(result.keys()):
if key not in projection:
del result[key]
yield result
else:
pass
used_keys.add(dedupe_key)
results_to_push = []
while result_heap:
next = heapq.heappop(result_heap)
if dedupe_key != GetDedupeKey(next):
results_to_push.append(next)
break
else:
results_to_push.append(next.GetNext())
results_to_push.append(top_result.GetNext())
for popped_result in results_to_push:
if popped_result.GetEntity():
heapq.heappush(result_heap, popped_result)
it = IterateResults(results)
try:
for _ in range(lower_bound):
next(it)
except StopIteration:
pass
return it