def publish()

in ingestion-edge/ingestion_edge/util.py [0:0]


    def publish(self, message: PubsubMessage) -> Optional[asyncio.Task]:
        """Asynchronously publish message."""
        # check if batch is not full
        if not self.full.is_set():
            # check if batch can accept message
            index = len(self.messages)
            new_size = self.size + PublishRequest(messages=[message]).ByteSize()
            overflow = (
                new_size > self.settings.max_bytes
                or index + 1 > self.settings.max_messages
            )

            if overflow:
                # fix https://github.com/googleapis/google-cloud-python/issues/7107
                if not self.messages:
                    raise ValueError("Message exceeds max bytes")
                # batch is full because it could not accept message
                self.full.set()
            else:
                # Store message in the batch.
                self.messages.append(message)
                self.size = new_size

                # return a task to await for the message id
                return asyncio.create_task(self.message_id(index))
        return None  # the batch cannot accept a message