def run_to_queue()

in src/google/appengine/ext/ndb/query.py [0:0]


  def run_to_queue(self, queue, conn, options=None):
    """Run this query, putting entities into the given queue."""
    if options is None:

      offset = None
      limit = None
      keys_only = None
    else:

      offset = options.offset
      limit = options.limit
      keys_only = options.keys_only


      if (options.start_cursor or options.end_cursor or
          options.produce_cursors):
        names = set()
        if self.__orders is not None:
          names = self.__orders._get_prop_names()
        if '__key__' not in names:
          raise datastore_errors.BadArgumentError(
              '_MultiQuery with cursors requires __key__ order')








    modifiers = {}
    if offset:
      modifiers['offset'] = None
      if limit is not None:
        modifiers['limit'] = min(_MAX_LIMIT, offset + limit)
    if keys_only and self.__orders is not None:
      modifiers['keys_only'] = None
    if modifiers:
      options = QueryOptions(config=options, **modifiers)

    if offset is None:
      offset = 0

    if limit is None:
      limit = _MAX_LIMIT

    if self.__orders is None:

      keys_seen = set()
      for subq in self.__subqueries:
        if limit <= 0:
          break
        subit = tasklets.SerialQueueFuture('_MultiQuery.run_to_queue[ser]')
        subq.run_to_queue(subit, conn, options=options)
        while limit > 0:
          try:
            batch, index, result = yield subit.getq()
          except EOFError:
            break
          if keys_only:
            key = result
          else:
            key = result._key
          if key not in keys_seen:
            keys_seen.add(key)
            if offset > 0:
              offset -= 1
            else:
              limit -= 1
              queue.putq((None, None, result))
      queue.complete()
      return




    with conn.adapter:

      todo = []
      for subq in self.__subqueries:
        dsquery = subq._get_query(conn)
        subit = tasklets.SerialQueueFuture('_MultiQuery.run_to_queue[par]')
        subq.run_to_queue(subit, conn, options=options, dsquery=dsquery)
        todo.append((subit, dsquery))


      state = []
      for subit, dsquery in todo:
        try:
          thing = yield subit.getq()
        except EOFError:
          continue
        else:
          state.append(_SubQueryIteratorState(thing, subit, dsquery,
                                              self.__orders))




      heapq.heapify(state)










      keys_seen = set()
      while state and limit > 0:
        item = heapq.heappop(state)
        batch = item.batch
        index = item.index
        entity = item.entity
        key = entity._key
        if key not in keys_seen:
          keys_seen.add(key)
          if offset > 0:
            offset -= 1
          else:
            limit -= 1
            if keys_only:
              queue.putq((batch, index, key))
            else:
              queue.putq((batch, index, entity))
        subit = item.iterator
        try:
          batch, index, entity = yield subit.getq()
        except EOFError:
          pass
        else:
          item.batch = batch
          item.index = index
          item.entity = entity
          heapq.heappush(state, item)
      queue.complete()