def Pull()

in ingestion-edge/pubsub_emulator.py [0:0]


    def Pull(self, request: pubsub_pb2.PullRequest, context: grpc.ServicerContext):
        """Pull implementation."""
        self.logger.debug("Pull(%.100s)", LazyFormat(request))
        received_messages: List[pubsub_pb2.ReceivedMessage] = []
        try:
            subscription = self.subscriptions[request.subscription]
        except KeyError:
            context.abort(grpc.StatusCode.NOT_FOUND, "Subscription not found")
        messages = subscription.published[: request.max_messages or 100]
        subscription.pulled.update(
            {message.message_id: message for message in messages}
        )
        for message in messages:
            try:
                subscription.published.remove(message)
            except ValueError:
                pass
        received_messages = [
            pubsub_pb2.ReceivedMessage(ack_id=message.message_id, message=message)
            for message in messages
        ]
        return pubsub_pb2.PullResponse(received_messages=received_messages)