in pathology/transformation_pipeline/ingestion_lib/polling_client.py [0:0]
def _sync_pull_one_msg(self) -> bool:
"""Sync pulls one pubsub message from subscription.
Transformation pipeline converts WSI->DICOM. The container will have
high storage & CPU requirements for each message. The message processing
time will be large and the number of messages will be low.
Processing messages synchronously.
The subscriber pulls 1 message.
Returns:
True if message received.
"""
if self._is_polling_pubsub():
response = self._pubsub_subscriber.pull(
request={
'subscription': self._current_subscription.subscription_path,
'max_messages': (
1 # Limit the subscriber to handle 1 message at a time.
),
},
return_immediately=False,
retry=retry.Retry(deadline=1000),
)
if self._no_message_received(response):
# if nothing is received; not an error; nothing to do.
return False
mlen = len(response.received_messages)
if mlen > 1:
cloud_logging_client.error(
'Received multiple messages from pub/sub subscription expected 1.',
{ingest_const.LogKeywords.MESSAGE_COUNT: str(mlen)},
)
msg = self._decode_pubsub_msg(response.received_messages[0])
else:
if (
not self._is_ingesting_from_gcs_file_list()
or not self._gcs_file_to_ingest_list
):
self._clear_current_msg()
return False
gcs_file_path = self._gcs_file_to_ingest_list.pop()
gcs_msg = gcs_file_msg.GCSFileMsg(gcs_file_path)
if not gcs_msg.gcs_file_exists():
cloud_logging_client.info(
'Specified file does not exist', {'uri': gcs_file_path}
)
self.ack()
self._clear_current_msg()
return False
cloud_logging_client.info(
'Ingesting file specified in file list', {'uri': gcs_file_path}
)
msg = self._decode_pubsub_msg(gcs_msg.received_msg)
self.current_msg = msg
if msg.ignore:
cloud_logging_client.debug(
'Pub/sub msg acked and ignored.',
{ingest_const.LogKeywords.URI: self.current_msg.uri},
)
self.ack(log_ack=False)
self._clear_current_msg()
return False
return True