pathology/transformation_pipeline/ingestion_lib/polling_client.py (378 lines of code) (raw):
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Sync polls pub/sub messages and executes ingest pipeline."""
import dataclasses
import time
from typing import Mapping, Optional
from google.api_core import retry
from google.cloud import pubsub_v1
from pathology.shared_libs.logging_lib import cloud_logging_client
from pathology.transformation_pipeline import ingest_flags
from pathology.transformation_pipeline.ingestion_lib import abstract_polling_client
from pathology.transformation_pipeline.ingestion_lib import abstract_pubsub_message_handler
from pathology.transformation_pipeline.ingestion_lib import ack_timeout_monitor
from pathology.transformation_pipeline.ingestion_lib import cloud_storage_client
from pathology.transformation_pipeline.ingestion_lib import ingest_const
from pathology.transformation_pipeline.ingestion_lib.pubsub_msgs import abstract_pubsub_msg
from pathology.transformation_pipeline.ingestion_lib.pubsub_msgs import gcs_file_msg
NO_PUBSUB_MSG_LOG_INTERVAL_SECONDS = 600 # 10 min timeout interval for logging
@dataclasses.dataclass(frozen=True)
class _SubscriptionAssets:
subscription_path: str
msg_handler: abstract_pubsub_message_handler.AbstractPubSubMsgHandler
ack_deadline_seconds: int
class PollingClient(abstract_polling_client.AbstractPollingClient):
"""Sync polls pub/sub messages and executes ingest pipeline."""
def __init__(
self,
project_id: str,
pubsub_subscription_to_handler: Mapping[
str, abstract_pubsub_message_handler.AbstractPubSubMsgHandler
],
):
cloud_logging_client.set_per_thread_log_signatures(False)
cloud_logging_client.set_log_trace_key(
ingest_const.LogKeywords.DPAS_INGESTION_TRACE_ID
)
cloud_logging_client.info('DPAS ingest pipeline starting')
if not project_id:
raise ValueError('Missing project id.')
if not pubsub_subscription_to_handler:
raise ValueError('Missing ingestion subscriptions.')
self._polling_client_running = True
self._no_pubsub_msg_log_timeout = time.time()
self._last_pubsub_msg_time = None
self._current_msg = None
self._message_ack_nack = ''
self._project_id = project_id
self._current_msg_start_time = 0.0
self._gcs_file_to_ingest_list = ingest_flags.GCS_FILE_INGEST_LIST_FLG.value
try:
if self._is_ingesting_from_gcs_file_list():
self._initialize_polling_client_for_ingestion_list(
pubsub_subscription_to_handler
)
self._ack_monitor = ack_timeout_monitor.PubSubAckTimeoutMonitor(
subscription_path=''
)
self._current_subscription = self._subscriptions[0]
else:
self._initialize_polling_client_for_pubsub_subscriptions(
project_id, pubsub_subscription_to_handler
)
self._current_subscription = self._subscriptions[0]
self._ack_monitor = ack_timeout_monitor.PubSubAckTimeoutMonitor(
subscription_path=self._current_subscription.subscription_path
)
self._clear_current_msg()
self._ack_monitor.start()
except ValueError as exp:
cloud_logging_client.critical('Failed to initialize polling client', exp)
raise
except Exception as exp:
cloud_logging_client.critical(
'An unexpected exception occurred in the GKE container', exp
)
raise
def _initialize_polling_client_for_ingestion_list(
self,
pubsub_subscription_to_handler: Mapping[
str, abstract_pubsub_message_handler.AbstractPubSubMsgHandler
],
):
"""Initializes subscription assets for fixed list of GCS files to ingest.
Args:
pubsub_subscription_to_handler: Map of Pub/Sub subscription id to message
handler.
Raises:
ValueError if more than one or missing subscription.
"""
if len(pubsub_subscription_to_handler) != 1:
raise ValueError(
'Unexpected subscriptions for GCS list ingestion: '
f'{pubsub_subscription_to_handler}'
)
self._pubsub_subscriber = None
msg_handler = None
for _, handler in pubsub_subscription_to_handler.items():
msg_handler = handler
break
subscription = _SubscriptionAssets( # pytype: disable=wrong-arg-types # py311-upgrade
subscription_path='',
msg_handler=msg_handler,
ack_deadline_seconds=ingest_const.MESSAGE_TTL_S,
)
self._subscriptions = [subscription]
cloud_logging_client.info(
'Ingesting list of files stored on GCS.',
{'list_of_files_to_ingest': self._gcs_file_to_ingest_list},
)
def _initialize_polling_client_for_pubsub_subscriptions(
self,
project_id: str,
pubsub_subscription_to_handler: Mapping[
str, abstract_pubsub_message_handler.AbstractPubSubMsgHandler
],
):
"""Initializes subscription assets based on subscription to msg handler map.
Args:
project_id: Project id to use for subscriptions.
pubsub_subscription_to_handler: Map of Pub/Sub subscription id to message
handler.
"""
self._pubsub_subscriber = pubsub_v1.SubscriberClient()
self._subscriptions = []
for pubsub_subscription, handler in pubsub_subscription_to_handler.items():
# Subscribe to pub/sub topic to poll for messages.
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = self._pubsub_subscriber.subscription_path(
project_id, pubsub_subscription
)
subscription_def = self._pubsub_subscriber.get_subscription(
subscription=subscription_path
)
ack_deadline_seconds = subscription_def.ack_deadline_seconds
if ack_deadline_seconds < ingest_const.MESSAGE_TTL_S:
cloud_logging_client.error(
'DPAS ingest pub/sub subscription ack deadline for '
f'{pubsub_subscription} is set to: {ack_deadline_seconds} seconds. '
'The subscription should have an ack deadline of at least '
f'{ingest_const.MESSAGE_TTL_S} seconds.'
)
subscription_exp = subscription_def.expiration_policy.ttl.seconds
if subscription_exp != 0:
cloud_logging_client.error(
'DPAS ingest pub/sub subscription is set to '
f'expire after {subscription_exp} seconds. The subscription '
'should be set to NEVER expire.'
)
cloud_logging_client.info(
f'Running {handler.name} DICOM generator for {pubsub_subscription}',
{'DICOM_gen': handler.name},
)
subscription = _SubscriptionAssets(
subscription_path, handler, ack_deadline_seconds
)
self._subscriptions.append(subscription)
def _is_ingesting_from_gcs_file_list(self) -> bool:
return self._gcs_file_to_ingest_list is not None
def _has_files_in_ingestion_list(self) -> bool:
return bool(self._gcs_file_to_ingest_list)
def _is_polling_pubsub(self) -> bool:
return self._pubsub_subscriber is not None
def stop_polling_client(self):
self._polling_client_running = False
def _running(self) -> bool:
"""Returns True if polling client is running."""
if not self._polling_client_running:
return False
if self._is_polling_pubsub():
return True
return self._has_files_in_ingestion_list()
def _validate_current_msg_processed(self):
"""Validates that all messages received an ack or nack."""
if self._current_msg is not None and not self._message_ack_nack:
# Message was not acknowledged (acked or nacked);
# nack to redeliver the message and log.
cloud_logging_client.error('Message was not nack or acked')
self.nack()
@property
def current_msg(self) -> Optional[abstract_pubsub_msg.AbstractPubSubMsg]:
return self._current_msg
@current_msg.setter
def current_msg(self, msg: abstract_pubsub_msg.AbstractPubSubMsg):
self._current_msg = msg
self._message_ack_nack = ''
self._current_msg_start_time = time.time()
if msg is None:
self._ack_monitor.clear_pubsub_msg()
cloud_logging_client.clear_log_signature()
else:
trace_id = msg.ingestion_trace_id
cloud_logging_client.set_log_signature({
ingest_const.LogKeywords.PUBSUB_MESSAGE_ID: msg.message_id,
ingest_const.LogKeywords.DPAS_INGESTION_TRACE_ID: trace_id,
})
self._ack_monitor.set_pubsub_msg(msg, self._current_msg_start_time)
def _clear_current_msg(self):
self.current_msg = None
def _test_log_message_timeout(self, ack_time_extension: float):
"""Tests and logs if time msg ack in less time than pub/sub requirement.
Args:
ack_time_extension: number of seconds ack extended
"""
total_time_sec = time.time() - self._current_msg_start_time
elapsed_time_sec = total_time_sec - ack_time_extension
if not self._is_polling_pubsub():
cloud_logging_client.info(
'Ingest pipeline processing completed',
{ingest_const.LogKeywords.TOTAL_TIME_SEC: str(total_time_sec)},
)
elif elapsed_time_sec >= self._current_subscription.ack_deadline_seconds:
cloud_logging_client.error(
'Ingest pipeline processing exceed subscription ack timeout',
{
ingest_const.LogKeywords.ELAPSED_TIME_BEYOND_EXTENSION_SEC: (
elapsed_time_sec
),
ingest_const.LogKeywords.ACK_DEADLINE_SEC: (
self._current_subscription.ack_deadline_seconds
),
ingest_const.LogKeywords.TOTAL_TIME_SEC: total_time_sec,
},
)
elif (
elapsed_time_sec
>= 0.8 * self._current_subscription.ack_deadline_seconds
):
cloud_logging_client.warning(
'Ingest pipeline processing is approaching subscription ack timeout',
{
ingest_const.LogKeywords.ELAPSED_TIME_BEYOND_EXTENSION_SEC: (
elapsed_time_sec
),
ingest_const.LogKeywords.ACK_DEADLINE_SEC: (
self._current_subscription.ack_deadline_seconds
),
ingest_const.LogKeywords.TOTAL_TIME_SEC: total_time_sec,
},
)
else:
cloud_logging_client.info(
'Ingest pipeline processing completed within ack timeout',
{ingest_const.LogKeywords.TOTAL_TIME_SEC: str(total_time_sec)},
)
def __enter__(self):
"""Wraps pubsub_v1.SubscriberClient context managers' entries.
Returns:
Self
"""
if self._pubsub_subscriber:
self._pubsub_subscriber.__enter__()
return self
def __exit__(self, unused_type, unused_value, unused_traceback):
"""Wraps pubsub_v1.SubscriberClient context managers' exits.
Automatically closes subscribers' gRPC channels when block is done.
"""
cloud_logging_client.info('DPAS ingest pipeline shutting down')
if self._pubsub_subscriber:
self._pubsub_subscriber.__exit__(
unused_type, unused_value, unused_traceback
)
self._validate_current_msg_processed()
self._ack_monitor.shutdown()
def is_acked(self) -> bool:
"""Returns true if message was acked."""
return self._message_ack_nack == 'ack'
def is_nacked(self) -> bool:
"""Returns true if message was not acked."""
return self._message_ack_nack == 'nack'
def ack(self, log_ack: bool = True):
"""Call to indicate Acks message, indicates message was processed.
https://googleapis.dev/python/pubsub/latest/subscriber/index.html#pulling-a-subscription-synchronously
Args:
log_ack: Set to False to disable ack logging.
"""
if self._message_ack_nack:
cloud_logging_client.warning(
f'Ack ignored. Message previously {self._message_ack_nack}'
)
else:
ack_time_extension = self._ack_monitor.clear_pubsub_msg()
self._message_ack_nack = 'ack'
if self._pubsub_subscriber:
self._pubsub_subscriber.acknowledge(
request={
'subscription': self._current_subscription.subscription_path,
'ack_ids': [self.current_msg.ack_id],
}
)
if log_ack:
cloud_logging_client.info('Acknowledged msg')
if log_ack:
self._test_log_message_timeout(ack_time_extension)
def nack(self, retry_ttl: int = 0):
"""Call to indicate message was not handled and should be redelivered.
Args:
retry_ttl: time in seconds to wait before retrying. 0 == immediate
https://googleapis.dev/python/pubsub/latest/subscriber/index.html#pulling-a-subscription-synchronously
0 second deadline time out causes nack behavior and forces pub/sub msg
reposting.
"""
if self._message_ack_nack:
cloud_logging_client.warning(
f'Nack ignored. Message previously {self._message_ack_nack}'
)
else:
self._ack_monitor.clear_pubsub_msg()
self._message_ack_nack = 'nack'
if self._pubsub_subscriber:
self._pubsub_subscriber.modify_ack_deadline(
request={
'subscription': self._current_subscription.subscription_path,
'ack_ids': [self.current_msg.ack_id],
'ack_deadline_seconds': retry_ttl, # ack_deadline_seconds
}
)
cloud_logging_client.warning('Retrying msg')
def _decode_pubsub_msg(
self, msg: pubsub_v1.types.ReceivedMessage
) -> abstract_pubsub_msg.AbstractPubSubMsg:
"""Pass pubsub msg to decoder described in DICOM Gen.
Allows decoder to implement decoder specific pubsub msg decoding.
Args:
msg: pubsub msg
Returns:
implementation of AbstractPubSubMsg
"""
return self._current_subscription.msg_handler.decode_pubsub_msg(msg)
def _no_message_received(
self, response: pubsub_v1.types.PullResponse
) -> bool:
"""Test if no message was received.
Logs no message was received at 10 min intervals (avoid overloading
cloud ops) to provide a mechanism for pods to indicate that they are
polling pub/sub for messages.
Args:
response: received pub/sub message
Returns:
True if no message was received. False if message received.
"""
current_time = time.time()
if response.received_messages:
# Received a message update timers
self._no_pubsub_msg_log_timeout = current_time
self._last_pubsub_msg_time = current_time
return False
# if nothing is received; not an error
if (
current_time - self._no_pubsub_msg_log_timeout
>= NO_PUBSUB_MSG_LOG_INTERVAL_SECONDS
):
if self._last_pubsub_msg_time is None:
last_msg_time_str = 'never'
else:
last_msg_time_str = str(current_time - self._last_pubsub_msg_time)
cloud_logging_client.debug(
'Polling for pub/sub msgs.',
{'Received_Last_PubMsg(sec)': last_msg_time_str},
)
self._no_pubsub_msg_log_timeout = current_time
return True
def _set_current_subscription(self, subscription: _SubscriptionAssets):
"""Updates current subscription and any related config.
Args:
subscription: Subscription assets to update current subscription to.
"""
self._current_subscription = subscription
if self._is_ingesting_from_gcs_file_list():
self._ack_monitor.set_subscription('')
else:
self._ack_monitor.set_subscription(subscription.subscription_path)
def _sync_pull_one_msg(self) -> bool:
"""Sync pulls one pubsub message from subscription.
Transformation pipeline converts WSI->DICOM. The container will have
high storage & CPU requirements for each message. The message processing
time will be large and the number of messages will be low.
Processing messages synchronously.
The subscriber pulls 1 message.
Returns:
True if message received.
"""
if self._is_polling_pubsub():
response = self._pubsub_subscriber.pull(
request={
'subscription': self._current_subscription.subscription_path,
'max_messages': (
1 # Limit the subscriber to handle 1 message at a time.
),
},
return_immediately=False,
retry=retry.Retry(deadline=1000),
)
if self._no_message_received(response):
# if nothing is received; not an error; nothing to do.
return False
mlen = len(response.received_messages)
if mlen > 1:
cloud_logging_client.error(
'Received multiple messages from pub/sub subscription expected 1.',
{ingest_const.LogKeywords.MESSAGE_COUNT: str(mlen)},
)
msg = self._decode_pubsub_msg(response.received_messages[0])
else:
if (
not self._is_ingesting_from_gcs_file_list()
or not self._gcs_file_to_ingest_list
):
self._clear_current_msg()
return False
gcs_file_path = self._gcs_file_to_ingest_list.pop()
gcs_msg = gcs_file_msg.GCSFileMsg(gcs_file_path)
if not gcs_msg.gcs_file_exists():
cloud_logging_client.info(
'Specified file does not exist', {'uri': gcs_file_path}
)
self.ack()
self._clear_current_msg()
return False
cloud_logging_client.info(
'Ingesting file specified in file list', {'uri': gcs_file_path}
)
msg = self._decode_pubsub_msg(gcs_msg.received_msg)
self.current_msg = msg
if msg.ignore:
cloud_logging_client.debug(
'Pub/sub msg acked and ignored.',
{ingest_const.LogKeywords.URI: self.current_msg.uri},
)
self.ack(log_ack=False)
self._clear_current_msg()
return False
return True
def _process_msg_through_ingest_pipeline(self):
"""Runs image transformation pipeline on an AbstractPubSubMsg."""
cloud_storage_client.reset_storage_client(self._project_id)
self._current_subscription.msg_handler.process_message(self)
self._validate_current_msg_processed()
self._clear_current_msg()
def run(self):
"""Runs polling client ingestion."""
try:
while True:
for subscription in self._subscriptions:
self._set_current_subscription(subscription)
if not self._running():
return
if self._sync_pull_one_msg():
self._process_msg_through_ingest_pipeline()
except Exception as exp:
cloud_logging_client.critical(
'An unexpected exception occurred in the GKE container',
exp,
{
ingest_const.LogKeywords.PUBSUB_SUBSCRIPTION: (
self._current_subscription.subscription_path
)
},
)
self.nack()
raise