def _sync_pull_one_msg()

in pathology/transformation_pipeline/ingestion_lib/polling_client.py [0:0]


  def _sync_pull_one_msg(self) -> bool:
    """Sync pulls one pubsub message from subscription.

    Transformation pipeline converts WSI->DICOM. The container will have
    high storage & CPU requirements for each message. The message processing
    time will be large and the number of messages will be low.
    Processing messages synchronously.

    The subscriber pulls 1 message.

    Returns:
      True if message received.
    """
    if self._is_polling_pubsub():
      response = self._pubsub_subscriber.pull(
          request={
              'subscription': self._current_subscription.subscription_path,
              'max_messages': (
                  1  # Limit the subscriber to handle 1 message at a time.
              ),
          },
          return_immediately=False,
          retry=retry.Retry(deadline=1000),
      )

      if self._no_message_received(response):
        # if nothing is received; not an error; nothing to do.
        return False

      mlen = len(response.received_messages)
      if mlen > 1:
        cloud_logging_client.error(
            'Received multiple messages from pub/sub subscription expected 1.',
            {ingest_const.LogKeywords.MESSAGE_COUNT: str(mlen)},
        )
      msg = self._decode_pubsub_msg(response.received_messages[0])
    else:
      if (
          not self._is_ingesting_from_gcs_file_list()
          or not self._gcs_file_to_ingest_list
      ):
        self._clear_current_msg()
        return False
      gcs_file_path = self._gcs_file_to_ingest_list.pop()
      gcs_msg = gcs_file_msg.GCSFileMsg(gcs_file_path)
      if not gcs_msg.gcs_file_exists():
        cloud_logging_client.info(
            'Specified file does not exist', {'uri': gcs_file_path}
        )
        self.ack()
        self._clear_current_msg()
        return False
      cloud_logging_client.info(
          'Ingesting file specified in file list', {'uri': gcs_file_path}
      )
      msg = self._decode_pubsub_msg(gcs_msg.received_msg)
    self.current_msg = msg
    if msg.ignore:
      cloud_logging_client.debug(
          'Pub/sub msg acked and ignored.',
          {ingest_const.LogKeywords.URI: self.current_msg.uri},
      )
      self.ack(log_ack=False)
      self._clear_current_msg()
      return False
    return True