in ingestion-edge/pubsub_emulator.py [0:0]
def Pull(self, request: pubsub_pb2.PullRequest, context: grpc.ServicerContext):
"""Pull implementation."""
self.logger.debug("Pull(%.100s)", LazyFormat(request))
received_messages: List[pubsub_pb2.ReceivedMessage] = []
try:
subscription = self.subscriptions[request.subscription]
except KeyError:
context.abort(grpc.StatusCode.NOT_FOUND, "Subscription not found")
messages = subscription.published[: request.max_messages or 100]
subscription.pulled.update(
{message.message_id: message for message in messages}
)
for message in messages:
try:
subscription.published.remove(message)
except ValueError:
pass
received_messages = [
pubsub_pb2.ReceivedMessage(ack_id=message.message_id, message=message)
for message in messages
]
return pubsub_pb2.PullResponse(received_messages=received_messages)