src/google/appengine/ext/ndb/query.py (1,223 lines of code) (raw):

#!/usr/bin/env python # # Copyright 2007 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # """Higher-level Query wrapper. There are perhaps too many query APIs in the world. The fundamental API here overloads the 6 comparisons operators to represent filters on property values, and supports AND and OR operations (implemented as functions -- Python's 'and' and 'or' operators cannot be overloaded, and the '&' and '|' operators have a priority that conflicts with the priority of comparison operators). For example:: class Employee(Model): name = StringProperty() age = IntegerProperty() rank = IntegerProperty() @classmethod def demographic(cls, min_age, max_age): return cls.query().filter(AND(cls.age >= min_age, cls.age <= max_age)) @classmethod def ranked(cls, rank): return cls.query(cls.rank == rank).order(cls.age) for emp in Employee.seniors(42, 5): print emp.name, emp.age, emp.rank The 'in' operator cannot be overloaded, but is supported through the IN() method. For example:: Employee.query().filter(Employee.rank.IN([4, 5, 6])) Sort orders are supported through the order() method; unary minus is overloaded on the Property class to represent a descending order:: Employee.query().order(Employee.name, -Employee.age) Besides using AND() and OR(), filters can also be combined by repeatedly calling .filter():: q1 = Employee.query() # A query that returns all employees q2 = q1.filter(Employee.age >= 30) # Only those over 30 q3 = q2.filter(Employee.age < 40) # Only those in their 30s A further shortcut is calling .filter() with multiple arguments; this implies AND():: q1 = Employee.query() # A query that returns all employees q3 = q1.filter(Employee.age >= 30, Employee.age < 40) # Only those in their 30s And finally you can also pass one or more filter expressions directly to the .query() method:: q3 = Employee.query(Employee.age >= 30, Employee.age < 40) # Only those in their 30s Query objects are immutable, so these methods always return a new Query object; the above calls to filter() do not affect q1. (On the other hand, operations that are effectively no-ops may return the original Query object.) Sort orders can also be combined this way, and .filter() and .order() calls may be intermixed:: q4 = q3.order(-Employee.age) q5 = q4.order(Employee.name) q6 = q5.filter(Employee.rank == 5) Again, multiple .order() calls can be combined:: q5 = q3.order(-Employee.age, Employee.name) The simplest way to retrieve Query results is a for-loop:: for emp in q3: print emp.name, emp.age Some other methods to run a query and access its results:: q.iter() # Return an iterator; same as iter(q) but more flexible q.map(callback) # Call the callback function for each query result q.fetch(N) # Return a list of the first N results q.get() # Return the first result q.count(N) # Return the number of results, with a maximum of N q.fetch_page(N, start_cursor=cursor) # Return (results, cursor, has_more) All of the above methods take a standard set of additional query options, either in the form of keyword arguments such as keys_only=True, or as QueryOptions object passed with options=QueryOptions(...). The most important query options are: - keys_only: bool, if set the results are keys instead of entities - limit: int, limits the number of results returned - offset: int, skips this many results first - start_cursor: Cursor, start returning results after this position - end_cursor: Cursor, stop returning results after this position - batch_size: int, hint for the number of results returned per RPC - prefetch_size: int, hint for the number of results in the first RPC - produce_cursors: bool, return Cursor objects with the results For additional (obscure) query options and more details on them, including an explanation of Cursors, see datastore_query.py. All of the above methods except for iter() have asynchronous variants as well, which return a Future; to get the operation's ultimate result, yield the Future (when inside a tasklet) or call the Future's get_result() method (outside a tasklet):: q.map_async(callback) # Callback may be a task or a plain function q.fetch_async(N) q.get_async() q.count_async(N) q.fetch_page_async(N, start_cursor=cursor) Finally, there's an idiom to efficiently loop over the Query results in a tasklet, properly yielding when appropriate:: it = q.iter() while (yield it.has_next_async()): emp = it.next() print emp.name, emp.age """ import datetime import functools import heapq import sys from google.appengine.ext.ndb import context from google.appengine.ext.ndb import model from google.appengine.ext.ndb import tasklets from google.appengine.ext.ndb import utils import six from six.moves import map from six.moves import range from six.moves import zip from google.appengine.api import cmp_compat from google.appengine.api import datastore_errors from google.appengine.api import datastore_types from google.appengine.api import namespace_manager from google.appengine.datastore import datastore_query from google.appengine.datastore import datastore_rpc __all__ = ['Query', 'QueryOptions', 'Cursor', 'QueryIterator', 'RepeatedStructuredPropertyPredicate', 'AND', 'OR', 'ConjunctionNode', 'DisjunctionNode', 'FilterNode', 'PostFilterNode', 'FalseNode', 'Node', 'ParameterNode', 'ParameterizedThing', 'Parameter', 'ParameterizedFunction', 'gql', ] Cursor = datastore_query.Cursor _ASC = datastore_query.PropertyOrder.ASCENDING _DESC = datastore_query.PropertyOrder.DESCENDING _AND = datastore_query.CompositeFilter.AND _KEY = datastore_types._KEY_SPECIAL_PROPERTY _OPS = frozenset(['=', '!=', '<', '<=', '>', '>=', 'in']) _MAX_LIMIT = 2 ** 31 - 1 class QueryOptions(context.ContextOptions, datastore_query.QueryOptions): """Support both context options and query options (esp. use_cache).""" class RepeatedStructuredPropertyPredicate(datastore_query.FilterPredicate): def __init__(self, match_keys, pb, key_prefix): super(RepeatedStructuredPropertyPredicate, self).__init__() self.match_keys = match_keys stripped_keys = [] for key in match_keys: if not key.startswith(key_prefix): raise ValueError('key %r does not begin with the specified prefix of %s' % (key, key_prefix)) stripped_key = six.ensure_text(key[len(key_prefix):]) stripped_keys.append(stripped_key) value_map = datastore_query._make_key_value_map(pb, stripped_keys) self.match_values = tuple(value_map[key][0] for key in stripped_keys) def _get_prop_names(self): return frozenset(self.match_keys) def _apply(self, key_value_map): """Apply the filter to values extracted from an entity. Think of self.match_keys and self.match_values as representing a table with one row. For example: match_keys = ('name', 'age', 'rank') match_values = ('Joe', 24, 5) (Except that in reality, the values are represented by tuples produced by datastore_types.PropertyValueToKeyValue().) represents this table: | name | age | rank | +---------+-------+--------+ | 'Joe' | 24 | 5 | Think of key_value_map as a table with the same structure but (potentially) many rows. This represents a repeated structured property of a single entity. For example: {'name': ['Joe', 'Jane', 'Dick'], 'age': [24, 21, 23], 'rank': [5, 1, 2]} represents this table: | name | age | rank | +---------+-------+--------+ | 'Joe' | 24 | 5 | | 'Jane' | 21 | 1 | | 'Dick' | 23 | 2 | We must determine wheter at least one row of the second table exactly matches the first table. We need this class because the datastore, when asked to find an entity with name 'Joe', age 24 and rank 5, will include entities that have 'Joe' somewhere in the name column, 24 somewhere in the age column, and 5 somewhere in the rank column, but not all aligned on a single row. Such an entity should not be considered a match. """ columns = [] for key in self.match_keys: column = key_value_map.get(six.ensure_text(key)) if not column: return False columns.append(column) return self.match_values in zip(*columns) class ParameterizedThing(object): """Base class for Parameter and ParameterizedFunction. This exists purely for isinstance() checks. """ def __eq__(self, other): raise NotImplementedError def __ne__(self, other): eq = self.__eq__(other) if eq is not NotImplemented: eq = not eq return eq class Parameter(ParameterizedThing): """Represents a bound variable in a GQL query. Parameter(1) corresponds to a slot labeled ":1" in a GQL query. Parameter('xyz') corresponds to a slot labeled ":xyz". The value must be set (bound) separately by calling .set(value). """ def __init__(self, key): """Constructor. Args: key: The Parameter key, must be either an integer or a string. """ if not isinstance(key, six.integer_types + six.string_types): raise TypeError('Parameter key must be an integer or string, not %s' % (key,)) self.__key = key def __repr__(self): return '%s(%r)' % (self.__class__.__name__, self.__key) def __eq__(self, other): if not isinstance(other, Parameter): return NotImplemented return self.__key == other.__key @property def key(self): """Retrieve the key.""" return self.__key def resolve(self, bindings, used): key = self.__key if key not in bindings: raise datastore_errors.BadArgumentError( 'Parameter :%s is not bound.' % key) value = bindings[key] used[key] = True return value class ParameterizedFunction(ParameterizedThing): """Represents a GQL function with parameterized arguments. For example, ParameterizedFunction('key', [Parameter(1)]) stands for the GQL syntax KEY(:1). """ def __init__(self, func, values): from google.appengine.ext import gql self.__func = func self.__values = values gqli = gql.GQL('SELECT * FROM Dummy') gql_method = gqli._GQL__cast_operators[func] self.__method = getattr(gqli, '_GQL' + gql_method.__name__) def __repr__(self): return 'ParameterizedFunction(%r, %r)' % (self.__func, self.__values) def __eq__(self, other): if not isinstance(other, ParameterizedFunction): return NotImplemented return (self.__func == other.__func and self.__values == other.__values) @property def func(self): return self.__func @property def values(self): return self.__values def is_parameterized(self): for val in self.__values: if isinstance(val, Parameter): return True return False def resolve(self, bindings, used): values = [] for val in self.__values: if isinstance(val, Parameter): val = val.resolve(bindings, used) values.append(val) result = self.__method(values) if self.__func == 'key' and isinstance(result, datastore_types.Key): result = model.Key.from_old_key(result) elif self.__func == 'time' and isinstance(result, datetime.datetime): result = datetime.time(result.hour, result.minute, result.second, result.microsecond) elif self.__func == 'date' and isinstance(result, datetime.datetime): result = datetime.date(result.year, result.month, result.day) return result class Node(object): """Base class for filter expression tree nodes. Tree nodes are considered immutable, even though they can contain Parameter instances, which are not. In particular, two identical trees may be represented by the same Node object in different contexts. """ def __new__(cls): if cls is Node: raise TypeError('Cannot instantiate Node, only a subclass.') return super(Node, cls).__new__(cls) def __eq__(self, other): raise NotImplementedError def __ne__(self, other): eq = self.__eq__(other) if eq is not NotImplemented: eq = not eq return eq def __unordered(self, unused_other): raise TypeError('Nodes cannot be ordered') __le__ = __lt__ = __ge__ = __gt__ = __unordered def _to_filter(self, post=False): """Helper to convert to datastore_query.Filter, or None.""" raise NotImplementedError def _post_filters(self): """Helper to extract post-filter Nodes, if any.""" return None def resolve(self, bindings, used): """Return a Node with Parameters replaced by the selected values. Args: bindings: A dict mapping integers and strings to values. used: A dict into which use of use of a binding is recorded. Returns: A Node instance. """ return self class FalseNode(Node): """Tree node for an always-failing filter.""" def __eq__(self, other): if not isinstance(other, FalseNode): return NotImplemented return True def _to_filter(self, post=False): if post: return None raise datastore_errors.BadQueryError( 'Cannot convert FalseNode to predicate') class ParameterNode(Node): """Tree node for a parameterized filter.""" def __new__(cls, prop, op, param): if not isinstance(prop, model.Property): raise TypeError('Expected a Property, got %r' % (prop,)) if op not in _OPS: raise TypeError('Expected a valid operator, got %r' % (op,)) if not isinstance(param, ParameterizedThing): raise TypeError('Expected a ParameterizedThing, got %r' % (param,)) obj = super(ParameterNode, cls).__new__(cls) obj.__prop = prop obj.__op = op obj.__param = param return obj def __getnewargs__(self): return self.__prop, self.__op, self.__param def __repr__(self): return 'ParameterNode(%r, %r, %r)' % (self.__prop, self.__op, self.__param) def __eq__(self, other): if not isinstance(other, ParameterNode): return NotImplemented return (self.__prop._name == other.__prop._name and self.__op == other.__op and self.__param == other.__param) def _to_filter(self, post=False): raise datastore_errors.BadArgumentError( 'Parameter :%s is not bound.' % (self.__param.key,)) def resolve(self, bindings, used): value = self.__param.resolve(bindings, used) if self.__op == 'in': return self.__prop._IN(value) else: return self.__prop._comparison(self.__op, value) class FilterNode(Node): """Tree node for a single filter expression.""" def __new__(cls, name, opsymbol, value): if isinstance(value, model.Key): value = value.to_old_key() if opsymbol == '!=': n1 = FilterNode(name, '<', value) n2 = FilterNode(name, '>', value) return DisjunctionNode(n1, n2) if opsymbol == 'in': if not isinstance(value, (list, tuple, set, frozenset)): raise TypeError('in expected a list, tuple or set of values; ' 'received %r' % value) nodes = [FilterNode(name, '=', v) for v in value] if not nodes: return FalseNode() if len(nodes) == 1: return nodes[0] return DisjunctionNode(*nodes) self = super(FilterNode, cls).__new__(cls) self.__name = name self.__opsymbol = opsymbol self.__value = value return self def __getnewargs__(self): return self.__name, self.__opsymbol, self.__value def __repr__(self): return '%s(%r, %r, %r)' % (self.__class__.__name__, self.__name, self.__opsymbol, self.__value) def __eq__(self, other): if not isinstance(other, FilterNode): return NotImplemented return (self.__name == other.__name and self.__opsymbol == other.__opsymbol and self.__value == other.__value) def __hash__(self): return hash((self.__name, self.__opsymbol, self.__value)) def _to_filter(self, post=False): if post: return None if self.__opsymbol in ('!=', 'in'): raise NotImplementedError('Inequality filters are not single filter ' 'expressions and therefore cannot be converted ' 'to a single filter (%r)' % self.__opsymbol) value = self.__value return datastore_query.make_filter(six.ensure_text(self.__name), self.__opsymbol, value) class PostFilterNode(Node): """Tree node representing an in-memory filtering operation. This is used to represent filters that cannot be executed by the datastore, for example a query for a structured value. """ def __new__(cls, predicate): self = super(PostFilterNode, cls).__new__(cls) self.predicate = predicate return self def __getnewargs__(self): return self.predicate, def __repr__(self): return '%s(%s)' % (self.__class__.__name__, self.predicate) def __eq__(self, other): if not isinstance(other, PostFilterNode): return NotImplemented return self is other or self.__dict__ == other.__dict__ def _to_filter(self, post=False): if post: return self.predicate else: return None class ConjunctionNode(Node): """Tree node representing a Boolean `AND` operator on two or more nodes.""" def __new__(cls, *nodes): if not nodes: raise TypeError('ConjunctionNode() requires at least one node.') elif len(nodes) == 1: return nodes[0] clauses = [[]] for node in nodes: if not isinstance(node, Node): raise TypeError('ConjunctionNode() expects Node instances as arguments;' ' received a non-Node instance %r' % node) if isinstance(node, DisjunctionNode): new_clauses = [] for clause in clauses: for subnode in node: new_clause = clause + [subnode] new_clauses.append(new_clause) clauses = new_clauses elif isinstance(node, ConjunctionNode): for clause in clauses: clause.extend(node.__nodes) else: for clause in clauses: clause.append(node) if not clauses: return FalseNode() if len(clauses) > 1: return DisjunctionNode(*[ConjunctionNode(*clause) for clause in clauses]) self = super(ConjunctionNode, cls).__new__(cls) self.__nodes = clauses[0] return self def __getnewargs__(self): return tuple(self.__nodes) def __iter__(self): return iter(self.__nodes) def __repr__(self): return 'AND(%s)' % (', '.join(map(str, self.__nodes))) def __eq__(self, other): if not isinstance(other, ConjunctionNode): return NotImplemented return self.__nodes == other.__nodes def _to_filter(self, post=False): filters = [ node._to_filter(post=post) for node in self.__nodes if isinstance(node, PostFilterNode) == post ] filters = [f for f in filters if f] if not filters: return None if len(filters) == 1: return filters[0] return datastore_query.CompositeFilter(_AND, filters) def _post_filters(self): post_filters = [node for node in self.__nodes if isinstance(node, PostFilterNode)] if not post_filters: return None if len(post_filters) == 1: return post_filters[0] if post_filters == self.__nodes: return self return ConjunctionNode(*post_filters) def resolve(self, bindings, used): nodes = [node.resolve(bindings, used) for node in self.__nodes] if nodes == self.__nodes: return self return ConjunctionNode(*nodes) class DisjunctionNode(Node): """Tree node representing a Boolean OR operator on two or more nodes.""" def __new__(cls, *nodes): if not nodes: raise TypeError('DisjunctionNode() requires at least one node') elif len(nodes) == 1: return nodes[0] self = super(DisjunctionNode, cls).__new__(cls) self.__nodes = [] for node in nodes: if not isinstance(node, Node): raise TypeError('DisjunctionNode() expects Node instances as arguments;' ' received a non-Node instance %r' % node) if isinstance(node, DisjunctionNode): self.__nodes.extend(node.__nodes) else: self.__nodes.append(node) return self def __getnewargs__(self): return tuple(self.__nodes) def __iter__(self): return iter(self.__nodes) def __repr__(self): return 'OR(%s)' % (', '.join(map(str, self.__nodes))) def __eq__(self, other): if not isinstance(other, DisjunctionNode): return NotImplemented return self.__nodes == other.__nodes def resolve(self, bindings, used): nodes = [node.resolve(bindings, used) for node in self.__nodes] if nodes == self.__nodes: return self return DisjunctionNode(*nodes) AND = ConjunctionNode OR = DisjunctionNode def _args_to_val(func, args): """Helper for GQL parsing to extract values from GQL expressions. This can extract the value from a GQL literal, return a Parameter for a GQL bound parameter (:1 or :foo), and interprets casts like KEY(...) and plain lists of values like (1, 2, 3). Args: func: A string indicating what kind of thing this is. args: One or more GQL values, each integer, string, or GQL literal. """ from google.appengine.ext import gql vals = [] for arg in args: if isinstance(arg, six.integer_types + six.string_types): val = Parameter(arg) elif isinstance(arg, gql.Literal): val = arg.Get() else: raise TypeError('Unexpected arg (%r)' % arg) vals.append(val) if func == 'nop': if len(vals) != 1: raise TypeError('"nop" requires exactly one value') return vals[0] pfunc = ParameterizedFunction(func, vals) if pfunc.is_parameterized(): return pfunc else: return pfunc.resolve({}, {}) def _get_prop_from_modelclass(modelclass, name): """Helper for FQL parsing to turn a property name into a property object. Args: modelclass: The model class specified in the query. name: The property name. This may contain dots which indicate sub-properties of structured properties. Returns: A Property object. Raises: KeyError if the property doesn't exist and the model clas doesn't derive from Expando. """ if name == '__key__': return modelclass._key parts = name.split('.') part, more = parts[0], parts[1:] prop = modelclass._properties.get(part) if prop is None: if issubclass(modelclass, model.Expando): prop = model.GenericProperty(part) else: raise TypeError('Model %s has no property named %r' % (modelclass._get_kind(), part)) while more: part = more.pop(0) if not isinstance(prop, model.StructuredProperty): raise TypeError('Model %s has no property named %r' % (modelclass._get_kind(), part)) maybe = getattr(prop, part, None) if isinstance(maybe, model.Property) and maybe._name == part: prop = maybe else: maybe = prop._modelclass._properties.get(part) if maybe is not None: prop = getattr(prop, maybe._code_name) else: if issubclass(prop._modelclass, model.Expando) and not more: prop = model.GenericProperty() prop._name = name else: raise KeyError('Model %s has no property named %r' % (prop._modelclass._get_kind(), part)) return prop class Query(object): """Query object. Usually constructed by calling Model.query(). See module docstring for examples. Note that not all operations on Queries are supported by _MultiQuery instances; the latter are generated as necessary when any of the operators !=, IN or OR is used. """ @utils.positional(1) def __init__(self, kind=None, ancestor=None, filters=None, orders=None, app=None, namespace=None, default_options=None, projection=None, group_by=None): """Constructor. Args: kind: Optional kind string. ancestor: Optional ancestor Key. filters: Optional Node representing a filter expression tree. orders: Optional datastore_query.Order object. app: Optional app id. namespace: Optional namespace. default_options: Optional QueryOptions object. projection: Optional list or tuple of properties to project. group_by: Optional list or tuple of properties to group by. """ if ancestor is not None: if isinstance(ancestor, ParameterizedThing): if isinstance(ancestor, ParameterizedFunction): if ancestor.func != 'key': raise TypeError('ancestor cannot be a GQL function other than KEY') else: if not isinstance(ancestor, model.Key): raise TypeError('ancestor must be a Key; received %r' % (ancestor,)) if not ancestor.id(): raise ValueError('ancestor cannot be an incomplete key') if app is not None: if app != ancestor.app(): raise TypeError( 'app/ancestor mismatch: %r != %r' % (app, ancestor.app())) if namespace is None: namespace = ancestor.namespace() else: if namespace != ancestor.namespace(): raise TypeError( 'namespace/ancestor mismatch: %r != %r' % ( namespace, ancestor.namespace())) if filters is not None: if not isinstance(filters, Node): raise TypeError('filters must be a query Node or None; received %r' % (filters,)) if orders is not None: if not isinstance(orders, datastore_query.Order): raise TypeError('orders must be an Order instance or None; received %r' % (orders,)) if default_options is not None: if not isinstance(default_options, datastore_rpc.BaseConfiguration): raise TypeError('default_options must be a Configuration or None; ' 'received %r' % (default_options,)) if projection is not None: if default_options.projection is not None: raise TypeError('cannot use projection= and ' 'default_options.projection at the same time') if default_options.keys_only is not None: raise TypeError('cannot use projection= and ' 'default_options.keys_only at the same time') self.__kind = kind self.__ancestor = ancestor self.__filters = filters self.__orders = orders self.__app = app self.__namespace = namespace self.__default_options = default_options self.__projection = None if projection is not None: if not projection: raise TypeError('projection argument cannot be empty') if not isinstance(projection, (tuple, list)): raise TypeError( 'projection must be a tuple, list or None; received %r' % (projection,)) self._check_properties(self._to_property_names(projection)) self.__projection = tuple( six.ensure_binary(p) if isinstance(p, six.text_type) else p for p in projection) self.__group_by = None if group_by is not None: if not group_by: raise TypeError('group_by argument cannot be empty') if not isinstance(group_by, (tuple, list)): raise TypeError( 'group_by must be a tuple, list or None; received %r' % (group_by,)) self._check_properties(self._to_property_names(group_by)) self.__group_by = tuple( six.ensure_binary(g) if isinstance(g, six.text_type) else g for g in group_by) def __repr__(self): args = [] if self.app is not None: args.append('app=%r' % self.app) if (self.namespace is not None and self.namespace != namespace_manager.get_namespace()): args.append('namespace=%r' % self.namespace) if self.kind is not None: args.append('kind=%r' % self.kind) if self.ancestor is not None: args.append('ancestor=%r' % self.ancestor) if self.filters is not None: args.append('filters=%r' % self.filters) if self.orders is not None: args.append('orders=...') if self.projection: args.append('projection=%r' % (self._to_property_names(self.projection))) if self.group_by: args.append('group_by=%r' % (self._to_property_names(self.group_by))) if self.default_options is not None: args.append('default_options=%r' % self.default_options) return '%s(%s)' % (self.__class__.__name__, ', '.join(args)) def _fix_namespace(self): """Internal helper to fix the namespace. This is called to ensure that for queries without an explicit namespace, the namespace used by async calls is the one in effect at the time the async call is made, not the one in effect when the the request is actually generated. """ if self.namespace is not None: return self namespace = namespace_manager.get_namespace() return self.__class__(kind=self.kind, ancestor=self.ancestor, filters=self.filters, orders=self.orders, app=self.app, namespace=namespace, default_options=self.default_options, projection=self.projection, group_by=self.group_by) def _get_query(self, connection): self.bind() kind = self.kind ancestor = self.ancestor if ancestor is not None: ancestor = connection.adapter.key_to_pb(ancestor) filters = self.filters post_filters = None if filters is not None: post_filters = filters._post_filters() filters = filters._to_filter() group_by = None if self.group_by: group_by = self._to_property_names(self.group_by) dsquery = datastore_query.Query( app=self.app, namespace=self.namespace, kind=six.ensure_text(kind) if kind else None, ancestor=ancestor, filter_predicate=filters, order=self.orders, group_by=group_by) if post_filters is not None: dsquery = datastore_query._AugmentedQuery( dsquery, in_memory_filter=post_filters._to_filter(post=True)) return dsquery @tasklets.tasklet def run_to_queue(self, queue, conn, options=None, dsquery=None): """Run this query, putting entities into the given queue.""" try: multiquery = self._maybe_multi_query() if multiquery is not None: yield multiquery.run_to_queue(queue, conn, options=options) return if dsquery is None: dsquery = self._get_query(conn) rpc = dsquery.run_async(conn, options) while rpc is not None: batch = yield rpc if (batch.skipped_results and datastore_query.FetchOptions.offset(options)): offset = options.offset - batch.skipped_results options = datastore_query.FetchOptions(offset=offset, config=options) rpc = batch.next_batch_async(options) for i, result in enumerate(batch.results): queue.putq((batch, i, result)) queue.complete() except GeneratorExit: raise except Exception: if not queue.done(): _, e, tb = sys.exc_info() queue.set_exception(e, tb) raise @tasklets.tasklet def _run_to_list(self, results, options=None): ctx = tasklets.get_context() conn = ctx._conn dsquery = self._get_query(conn) rpc = dsquery.run_async(conn, options) while rpc is not None: batch = yield rpc if (batch.skipped_results and datastore_query.FetchOptions.offset(options)): offset = options.offset - batch.skipped_results options = datastore_query.FetchOptions(offset=offset, config=options) rpc = batch.next_batch_async(options) for result in batch.results: result = ctx._update_cache_from_query_result(result, options) if result is not None: results.append(result) raise tasklets.Return(results) def _needs_multi_query(self): filters = self.filters return filters is not None and isinstance(filters, DisjunctionNode) def _maybe_multi_query(self): if not self._needs_multi_query(): return None filters = self.filters subqueries = [] for subfilter in filters: subquery = self.__class__(kind=self.kind, ancestor=self.ancestor, filters=subfilter, orders=self.orders, app=self.app, namespace=self.namespace, default_options=self.default_options, projection=self.projection, group_by=self.group_by) subqueries.append(subquery) return _MultiQuery(subqueries) @property def kind(self): """Accessor for the kind (a string or None).""" return self.__kind @property def ancestor(self): """Accessor for the ancestor (a Key or None).""" return self.__ancestor @property def filters(self): """Accessor for the filters (a Node or None).""" return self.__filters @property def orders(self): """Accessor for the filters (a datastore_query.Order or None).""" return self.__orders @property def app(self): """Accessor for the app (a string or None).""" return self.__app @property def namespace(self): """Accessor for the namespace (a string or None).""" return self.__namespace @property def default_options(self): """Accessor for the default_options (a QueryOptions instance or None).""" return self.__default_options @property def group_by(self): """Accessor for the group by properties (a tuple instance or None).""" return self.__group_by @property def projection(self): """Accessor for the projected properties (a tuple instance or None).""" return self.__projection @property def is_distinct(self): """True if results are guaranteed to contain a unique set of property values. This happens when every property in the group_by is also in the projection. """ return bool(self.__group_by and set(self._to_property_names(self.__group_by)) <= set(self._to_property_names(self.__projection))) def filter(self, *args): """Return a new Query with additional filter(s) applied.""" if not args: return self preds = [] f = self.filters if f: preds.append(f) for arg in args: if not isinstance(arg, Node): raise TypeError('Cannot filter a non-Node argument; received %r' % arg) preds.append(arg) if not preds: pred = None elif len(preds) == 1: pred = preds[0] else: pred = ConjunctionNode(*preds) return self.__class__(kind=self.kind, ancestor=self.ancestor, filters=pred, orders=self.orders, app=self.app, namespace=self.namespace, default_options=self.default_options, projection=self.projection, group_by=self.group_by) def order(self, *args): """Return a new Query with additional sort order(s) applied.""" if not args: return self orders = [] o = self.orders if o: orders.append(o) for arg in args: if isinstance(arg, model.Property): orders.append(datastore_query.PropertyOrder(arg._name, _ASC)) elif isinstance(arg, datastore_query.Order): orders.append(arg) else: raise TypeError('order() expects a Property or query Order; ' 'received %r' % arg) if not orders: orders = None elif len(orders) == 1: orders = orders[0] else: orders = datastore_query.CompositeOrder(orders) return self.__class__(kind=self.kind, ancestor=self.ancestor, filters=self.filters, orders=orders, app=self.app, namespace=self.namespace, default_options=self.default_options, projection=self.projection, group_by=self.group_by) def iter(self, **q_options): """Construct an iterator over the query. Args: **q_options: All query options keyword arguments are supported. Returns: A QueryIterator object. """ self.bind() return QueryIterator(self, **q_options) __iter__ = iter @utils.positional(2) def map(self, callback, pass_batch_into_callback=None, merge_future=None, **q_options): """Map a callback function or tasklet over the query results. Args: callback: A function or tasklet to be applied to each result; see below. merge_future: Optional Future subclass; see below. **q_options: All query options keyword arguments are supported. Callback signature: The callback is normally called with an entity as argument. However if keys_only=True is given, it is called with a Key. Also, when pass_batch_into_callback is True, it is called with three arguments: the current batch, the index within the batch, and the entity or Key at that index. The callback can return whatever it wants. If the callback is None, a trivial callback is assumed that just returns the entity or key passed in (ignoring produce_cursors). Optional merge future: The merge_future is an advanced argument that can be used to override how the callback results are combined into the overall map() return value. By default a list of callback return values is produced. By substituting one of a small number of specialized alternatives you can arrange otherwise. See tasklets.MultiFuture for the default implementation and a description of the protocol the merge_future object must implement the default. Alternatives from the same module include QueueFuture, SerialQueueFuture and ReducingFuture. Returns: When the query has run to completion and all callbacks have returned, map() returns a list of the results of all callbacks. (But see 'optional merge future' above.) """ return self.map_async(callback, pass_batch_into_callback=pass_batch_into_callback, merge_future=merge_future, **q_options).get_result() @utils.positional(2) def map_async(self, callback, pass_batch_into_callback=None, merge_future=None, **q_options): """Map a callback function or tasklet over the query results. This is the asynchronous version of Query.map(). """ qry = self._fix_namespace() return tasklets.get_context().map_query( qry, callback, pass_batch_into_callback=pass_batch_into_callback, options=self._make_options(q_options), merge_future=merge_future) @utils.positional(2) def fetch(self, limit=None, **q_options): """Fetch a list of query results, up to a limit. Args: limit: How many results to retrieve at most. **q_options: All query options keyword arguments are supported. Returns: A list of results. """ return self.fetch_async(limit, **q_options).get_result() @utils.positional(2) def fetch_async(self, limit=None, **q_options): """Fetch a list of query results, up to a limit. This is the asynchronous version of Query.fetch(). """ if limit is None: default_options = self._make_options(q_options) if default_options is not None and default_options.limit is not None: limit = default_options.limit else: limit = _MAX_LIMIT q_options['limit'] = limit q_options.setdefault('batch_size', limit) if self._needs_multi_query(): return self.map_async(None, **q_options) options = self._make_options(q_options) qry = self._fix_namespace() return qry._run_to_list([], options=options) def get(self, **q_options): """Get the first query result, if any. This is similar to calling q.fetch(1) and returning the first item of the list of results, if any, otherwise None. Args: **q_options: All query options keyword arguments are supported. Returns: A single result, or None if there are no results. """ return self.get_async(**q_options).get_result() def get_async(self, **q_options): """Get the first query result, if any. This is the asynchronous version of Query.get(). """ qry = self._fix_namespace() return qry._get_async(**q_options) @tasklets.tasklet def _get_async(self, **q_options): """Internal version of get_async().""" res = yield self.fetch_async(1, **q_options) if not res: raise tasklets.Return(None) raise tasklets.Return(res[0]) @utils.positional(2) def count(self, limit=None, **q_options): """Count the number of query results, up to a limit. This returns the same result as len(q.fetch(limit)) but more efficiently. Note that you must pass a maximum value to limit the amount of work done by the query. Args: limit: How many results to count at most. **q_options: All query options keyword arguments are supported. Returns: """ return self.count_async(limit, **q_options).get_result() @utils.positional(2) def count_async(self, limit=None, **q_options): """Count the number of query results, up to a limit. This is the asynchronous version of Query.count(). """ qry = self._fix_namespace() return qry._count_async(limit=limit, **q_options) @tasklets.tasklet def _count_async(self, limit=None, **q_options): """Internal version of count_async().""" if 'offset' in q_options: raise NotImplementedError('.count() and .count_async() do not support ' 'offsets at present.') if 'limit' in q_options: raise TypeError('Cannot specify limit as a non-keyword argument and as a ' 'keyword argument simultaneously.') elif limit is None: limit = _MAX_LIMIT if self._needs_multi_query(): q_options.setdefault('batch_size', limit) q_options.setdefault('keys_only', True) results = yield self.fetch_async(limit, **q_options) raise tasklets.Return(len(results)) q_options['offset'] = limit q_options['limit'] = 0 options = self._make_options(q_options) conn = tasklets.get_context()._conn dsquery = self._get_query(conn) rpc = dsquery.run_async(conn, options) total = 0 while rpc is not None: batch = yield rpc options = QueryOptions(offset=options.offset - batch.skipped_results, config=options) rpc = batch.next_batch_async(options) total += batch.skipped_results raise tasklets.Return(total) @utils.positional(2) def fetch_page(self, page_size, **q_options): """Fetch a page of results. This is a specialized method for use by paging user interfaces. Args: page_size: The requested page size. At most this many results will be returned. In addition, any keyword argument supported by the QueryOptions class is supported. In particular, to fetch the next page, you pass the cursor returned by one call to the next call using start_cursor=<cursor>. A common idiom is to pass the cursor to the client using <cursor>.to_websafe_string() and to reconstruct that cursor on a subsequent request using Cursor.from_websafe_string(<string>). Returns: A tuple (results, cursor, more) where results is a list of query results, cursor is a cursor pointing just after the last result returned, and more is a bool indicating whether there are (likely) more results after that. """ return self.fetch_page_async(page_size, **q_options).get_result() @utils.positional(2) def fetch_page_async(self, page_size, **q_options): """Fetch a page of results. This is the asynchronous version of Query.fetch_page(). """ qry = self._fix_namespace() return qry._fetch_page_async(page_size, **q_options) @tasklets.tasklet def _fetch_page_async(self, page_size, **q_options): """Internal version of fetch_page_async().""" q_options.setdefault('batch_size', page_size) q_options.setdefault('produce_cursors', True) it = self.iter(limit=page_size + 1, **q_options) results = [] while (yield it.has_next_async()): results.append(next(it)) if len(results) >= page_size: break try: cursor = it.cursor_after() except datastore_errors.BadArgumentError: cursor = None raise tasklets.Return(results, cursor, it.probably_has_next()) def _make_options(self, q_options): """Helper to construct a QueryOptions object from keyword arguments. Args: q_options: a dict of keyword arguments. Note that either 'options' or 'config' can be used to pass another QueryOptions object, but not both. If another QueryOptions object is given it provides default values. If self.default_options is set, it is used to provide defaults, which have a lower precedence than options set in q_options. Returns: A QueryOptions object, or None if q_options is empty. """ if not (q_options or self.__projection): return self.default_options if 'options' in q_options: if 'config' in q_options: raise TypeError('You cannot use config= and options= at the same time') q_options['config'] = q_options.pop('options') if q_options.get('projection'): try: q_options['projection'] = self._to_property_names( q_options['projection']) except TypeError as e: raise datastore_errors.BadArgumentError(e) self._check_properties(q_options['projection']) options = QueryOptions(**q_options) if (options.keys_only is None and options.projection is None and self.__projection): options = QueryOptions( projection=self._to_property_names(self.__projection), config=options) if self.default_options is not None: options = self.default_options.merge(options) return options def _to_property_names(self, properties): if not isinstance(properties, (list, tuple)): properties = [properties] fixed = [] for proj in properties: if isinstance(proj, (six.text_type, six.binary_type)): fixed.append(proj) elif isinstance(proj, model.Property): fixed.append(proj._name) else: raise TypeError( 'Unexpected property (%r); should be string or Property' % (proj,)) return fixed def _check_properties(self, fixed, **kwargs): modelclass = model.Model._kind_map.get(self.__kind) if modelclass is not None: modelclass._check_properties(fixed, **kwargs) def analyze(self): """Return a list giving the parameters required by a query.""" class MockBindings(dict): def __contains__(self, key): self[key] = None return True bindings = MockBindings() used = {} ancestor = self.ancestor if isinstance(ancestor, ParameterizedThing): ancestor = ancestor.resolve(bindings, used) filters = self.filters if filters is not None: filters = filters.resolve(bindings, used) if six.PY2: return sorted(used) else: return sorted(used, key=functools.cmp_to_key(cmp_compat.cmp)) def bind(self, *args, **kwds): """Bind parameter values. Returns a new Query object.""" return self._bind(args, kwds) def _bind(self, args, kwds): """Bind parameter values. Returns a new Query object.""" bindings = dict(kwds) for i, arg in enumerate(args): bindings[i + 1] = arg used = {} ancestor = self.ancestor if isinstance(ancestor, ParameterizedThing): ancestor = ancestor.resolve(bindings, used) filters = self.filters if filters is not None: filters = filters.resolve(bindings, used) unused = [] for i in range(1, 1 + len(args)): if i not in used: unused.append(i) if unused: raise datastore_errors.BadArgumentError( 'Positional arguments %s were given but not used.' % ', '.join(str(i) for i in unused)) return self.__class__(kind=self.kind, ancestor=ancestor, filters=filters, orders=self.orders, app=self.app, namespace=self.namespace, default_options=self.default_options, projection=self.projection, group_by=self.group_by) def gql(query_string, *args, **kwds): """Parse a GQL query string. Args: query_string: Full GQL query, e.g. 'SELECT * FROM Kind WHERE prop = 1'. *args, **kwds: If present, used to call bind(). Returns: An instance of query_class. """ qry = _gql(query_string) if args or kwds: qry = qry._bind(args, kwds) return qry @utils.positional(1) def _gql(query_string, query_class=Query): """Parse a GQL query string (internal version). Args: query_string: Full GQL query, e.g. 'SELECT * FROM Kind WHERE prop = 1'. query_class: Optional class to use, default Query. Returns: An instance of query_class. """ from google.appengine.ext import gql gql_qry = gql.GQL(query_string) kind = gql_qry.kind() if kind is None: modelclass = model.Expando else: modelclass = model.Model._lookup_model( kind, tasklets.get_context()._conn.adapter.default_model) kind = modelclass._get_kind() ancestor = None flt = gql_qry.filters() filters = list(modelclass._default_filters()) for name_op in sorted(flt, key=functools.cmp_to_key(cmp_compat.cmp)): name, op = name_op values = flt[name_op] op = op.lower() if op == 'is' and name == gql.GQL._GQL__ANCESTOR: if len(values) != 1: raise ValueError('"is" requires exactly one value') [(func, args)] = values ancestor = _args_to_val(func, args) continue if op not in _OPS: raise NotImplementedError('Operation %r is not supported.' % op) for (func, args) in values: val = _args_to_val(func, args) prop = _get_prop_from_modelclass(modelclass, name) if six.ensure_text(prop._name) != name: raise RuntimeError('Whoa! _get_prop_from_modelclass(%s, %r) ' 'returned a property whose name is %r?!' % (modelclass.__name__, name, prop._name)) if isinstance(val, ParameterizedThing): node = ParameterNode(prop, op, val) elif op == 'in': node = prop._IN(val) else: node = prop._comparison(op, val) filters.append(node) if filters: filters = ConjunctionNode(*filters) else: filters = None orders = _orderings_to_orders(gql_qry.orderings(), modelclass) offset = gql_qry.offset() limit = gql_qry.limit() if limit < 0: limit = None keys_only = gql_qry._keys_only if not keys_only: keys_only = None options = QueryOptions(offset=offset, limit=limit, keys_only=keys_only) projection = gql_qry.projection() if gql_qry.is_distinct(): group_by = projection else: group_by = None qry = query_class(kind=kind, ancestor=ancestor, filters=filters, orders=orders, default_options=options, projection=projection, group_by=group_by) return qry class QueryIterator(six.Iterator): """This iterator works both for synchronous and async callers! For synchronous callers, just use: for entity in Account.query(): <use entity> Async callers use this idiom: it = iter(Account.query()) while (yield it.has_next_async()): entity = it.next() <use entity> You can also use q.iter([options]) instead of iter(q); this allows passing query options such as keys_only or produce_cursors. When keys_only is set, it.next() returns a key instead of an entity. When produce_cursors is set, the methods it.cursor_before() and it.cursor_after() return Cursor objects corresponding to the query position just before and after the item returned by it.next(). Before it.next() is called for the first time, both raise an exception. Once the loop is exhausted, both return the cursor after the last item returned. Calling it.has_next() does not affect the cursors; you must call it.next() before the cursors move. Note that sometimes requesting a cursor requires a Cloud Datastore roundtrip (but not if you happen to request a cursor corresponding to a batch boundary). If produce_cursors is not set, both methods always raise an exception. Note that queries requiring in-memory merging of multiple queries (i.e. queries using the IN, != or OR operators) do not support query options. """ _cursor_before = _cursor_after = ( datastore_errors.BadArgumentError('There is no cursor currently')) _index_list = None _more_results = None _exhausted = False @utils.positional(2) def __init__(self, query, **q_options): """Constructor. Takes a Query and query options. This is normally called by Query.iter() or Query.__iter__(). """ ctx = tasklets.get_context() options = query._make_options(q_options) callback = self._extended_callback self._iter = ctx.iter_query(query, callback=callback, pass_batch_into_callback=True, options=options) self._fut = None def _extended_callback(self, batch, index, ent): if self._exhausted: raise RuntimeError('QueryIterator is already exhausted') if batch is None: return (ent, None, None, None) if self._index_list is None: self._index_list = getattr(batch, 'index_list', None) more_results = index + 1 < len(batch.results) or batch.more_results before_cursor = None after_cursor = None try: before_cursor = batch.cursor(index) except BaseException as e: before_cursor = e try: after_cursor = batch.cursor(index + 1) except BaseException as e: after_cursor = e return (ent, before_cursor, after_cursor, more_results) def cursor_before(self): """Return the cursor before the current item. You must pass a QueryOptions object with produce_cursors=True for this to work. If there is no cursor or no current item, raise BadArgumentError. Before next() has returned there is no cursor. Once the loop is exhausted, this returns the cursor after the last item. """ if self._exhausted: return self.cursor_after() if isinstance(self._cursor_before, BaseException): raise self._cursor_before return self._cursor_before def cursor_after(self): """Return the cursor after the current item. You must pass a QueryOptions object with produce_cursors=True for this to work. If there is no cursor or no current item, raise BadArgumentError. Before next() has returned there is no cursor. Once the loop is exhausted, this returns the cursor after the last item. """ if isinstance(self._cursor_after, BaseException): raise self._cursor_after return self._cursor_after def index_list(self): """Return the list of indexes used for this query. This returns a list of index representations, where an index representation is the same as what is returned by get_indexes(). Before the first result, the information is unavailable, and then None is returned. This is not the same as an empty list -- the empty list means that no index was used to execute the query. (In the dev_appserver, an empty list may also mean that only built-in indexes were used; metadata queries also return an empty list here.) Proper use is as follows: q = <modelclass>.query(<filters>) i = q.iter() try: i.next() except Stopiteration: pass indexes = i.index_list() assert isinstance(indexes, list) Notes: - Forcing produce_cursors=False makes this always return None. - This always returns None for a multi-query. """ return self._index_list def __iter__(self): """Iterator protocol: get the iterator for this iterator, i.e. self.""" return self def probably_has_next(self): """Return whether a next item is (probably) available. This is not quite the same as has_next(), because when produce_cursors is set, some shortcuts are possible. However, in some cases (e.g. when the query has a post_filter) we can get a false positive (returns True but next() will raise StopIteration). There are no false negatives.""" if self._more_results: return True return self.has_next() def has_next(self): """Return whether a next item is available. See the module docstring for the usage pattern. """ return self.has_next_async().get_result() @tasklets.tasklet def has_next_async(self): """Return a Future whose result will say whether a next item is available. See the module docstring for the usage pattern. """ if self._fut is None: self._fut = self._iter.getq() flag = True try: yield self._fut except EOFError: flag = False raise tasklets.Return(flag) def __next__(self): """Iterator protocol: get next item or raise StopIteration.""" if self._fut is None: self._fut = self._iter.getq() try: try: (ent, self._cursor_before, self._cursor_after, self._more_results) = self._fut.get_result() return ent except EOFError: self._exhausted = True raise StopIteration finally: self._fut = None @cmp_compat.total_ordering_from_cmp class _SubQueryIteratorState(object): """Helper class for _MultiQuery.""" def __init__(self, batch_i_entity, iterator, dsquery, orders): batch, index, entity = batch_i_entity self.batch = batch self.index = index self.entity = entity self.iterator = iterator self.dsquery = dsquery self.orders = orders def __cmp__(self, other): if not isinstance(other, _SubQueryIteratorState): raise NotImplementedError('Can only compare _SubQueryIteratorState ' 'instances to other _SubQueryIteratorState ' 'instances; not %r' % other) if not self.orders == other.orders: raise NotImplementedError('Cannot compare _SubQueryIteratorStates with ' 'differing orders (%r != %r)' % (self.orders, other.orders)) lhs = self.entity._orig_pb rhs = other.entity._orig_pb lhs_filter = self.dsquery.filter_predicate rhs_filter = other.dsquery.filter_predicate names = self.orders._get_prop_names() if lhs_filter is not None: names |= lhs_filter._get_prop_names() if rhs_filter is not None: names |= rhs_filter._get_prop_names() lhs_value_map = datastore_query._make_key_value_map(lhs, names) rhs_value_map = datastore_query._make_key_value_map(rhs, names) if lhs_filter is not None: lhs_filter._prune(lhs_value_map) if rhs_filter is not None: rhs_filter._prune(rhs_value_map) return self.orders._cmp(lhs_value_map, rhs_value_map) class _MultiQuery(object): """Helper class to run queries involving !=, IN or OR operators.""" def __init__(self, subqueries): if not isinstance(subqueries, list): raise TypeError('subqueries must be a list; received %r' % subqueries) for subq in subqueries: if not isinstance(subq, Query): raise TypeError('Each subquery must be a Query instances; received %r' % subq) first_subquery = subqueries[0] kind = first_subquery.kind orders = first_subquery.orders if not kind: raise ValueError('Subquery kind cannot be missing') for subq in subqueries[1:]: if subq.kind != kind: raise ValueError('Subqueries must be for a common kind (%s != %s)' % (subq.kind, kind)) elif subq.orders != orders: raise ValueError('Subqueries must have the same order(s) (%s != %s)' % (subq.orders, orders)) self.__subqueries = subqueries self.__orders = orders self.ancestor = None def _make_options(self, q_options): return self.__subqueries[0].default_options @property def orders(self): return self.__orders @property def default_options(self): return self.__subqueries[0].default_options @tasklets.tasklet def run_to_queue(self, queue, conn, options=None): """Run this query, putting entities into the given queue.""" if options is None: offset = None limit = None keys_only = None else: offset = options.offset limit = options.limit keys_only = options.keys_only if (options.start_cursor or options.end_cursor or options.produce_cursors): names = set() if self.__orders is not None: names = self.__orders._get_prop_names() if '__key__' not in names: raise datastore_errors.BadArgumentError( '_MultiQuery with cursors requires __key__ order') modifiers = {} if offset: modifiers['offset'] = None if limit is not None: modifiers['limit'] = min(_MAX_LIMIT, offset + limit) if keys_only and self.__orders is not None: modifiers['keys_only'] = None if modifiers: options = QueryOptions(config=options, **modifiers) if offset is None: offset = 0 if limit is None: limit = _MAX_LIMIT if self.__orders is None: keys_seen = set() for subq in self.__subqueries: if limit <= 0: break subit = tasklets.SerialQueueFuture('_MultiQuery.run_to_queue[ser]') subq.run_to_queue(subit, conn, options=options) while limit > 0: try: batch, index, result = yield subit.getq() except EOFError: break if keys_only: key = result else: key = result._key if key not in keys_seen: keys_seen.add(key) if offset > 0: offset -= 1 else: limit -= 1 queue.putq((None, None, result)) queue.complete() return with conn.adapter: todo = [] for subq in self.__subqueries: dsquery = subq._get_query(conn) subit = tasklets.SerialQueueFuture('_MultiQuery.run_to_queue[par]') subq.run_to_queue(subit, conn, options=options, dsquery=dsquery) todo.append((subit, dsquery)) state = [] for subit, dsquery in todo: try: thing = yield subit.getq() except EOFError: continue else: state.append(_SubQueryIteratorState(thing, subit, dsquery, self.__orders)) heapq.heapify(state) keys_seen = set() while state and limit > 0: item = heapq.heappop(state) batch = item.batch index = item.index entity = item.entity key = entity._key if key not in keys_seen: keys_seen.add(key) if offset > 0: offset -= 1 else: limit -= 1 if keys_only: queue.putq((batch, index, key)) else: queue.putq((batch, index, entity)) subit = item.iterator try: batch, index, entity = yield subit.getq() except EOFError: pass else: item.batch = batch item.index = index item.entity = entity heapq.heappush(state, item) queue.complete() def iter(self, **q_options): return QueryIterator(self, **q_options) __iter__ = iter def _order_to_ordering(order): pb = order._to_pb() return pb.property, pb.direction def _orders_to_orderings(orders): if orders is None: return [] if isinstance(orders, datastore_query.PropertyOrder): return [_order_to_ordering(orders)] if isinstance(orders, datastore_query.CompositeOrder): return [(pb.property, pb.direction) for pb in orders._to_pbs()] raise ValueError('Bad order: %r' % (orders,)) def _ordering_to_order(ordering, modelclass): name, direction = ordering prop = _get_prop_from_modelclass(modelclass, name) if six.ensure_text(prop._name) != name: raise RuntimeError('Whoa! _get_prop_from_modelclass(%s, %r) ' 'returned a property whose name is %r?!' % (modelclass.__name__, name, prop._name)) return datastore_query.PropertyOrder(name, direction) def _orderings_to_orders(orderings, modelclass): orders = [_ordering_to_order(o, modelclass) for o in orderings] if not orders: return None if len(orders) == 1: return orders[0] return datastore_query.CompositeOrder(orders)