in ingestion-edge/ingestion_edge/util.py [0:0]
def publish(self, message: PubsubMessage) -> Optional[asyncio.Task]:
"""Asynchronously publish message."""
# check if batch is not full
if not self.full.is_set():
# check if batch can accept message
index = len(self.messages)
new_size = self.size + PublishRequest(messages=[message]).ByteSize()
overflow = (
new_size > self.settings.max_bytes
or index + 1 > self.settings.max_messages
)
if overflow:
# fix https://github.com/googleapis/google-cloud-python/issues/7107
if not self.messages:
raise ValueError("Message exceeds max bytes")
# batch is full because it could not accept message
self.full.set()
else:
# Store message in the batch.
self.messages.append(message)
self.size = new_size
# return a task to await for the message id
return asyncio.create_task(self.message_id(index))
return None # the batch cannot accept a message