in src/pubsub/ipc_pubsub.py [0:0]
def init_topic_subscriber(self):
'''
Initialise subscription to requested IPC local topics.
'''
# Create ThreadPoolExecutor to process PubSub messages.
# Changed in version 3.8: Default value of max_workers is changed to min(32, os.cpu_count() + 4)
executor = ThreadPoolExecutor(max_workers=None)
for subscribe_topic in self.ipc_subscribe_topics:
try:
log.info('IPC subscribing to topic: {}'.format(subscribe_topic))
request = SubscribeToTopicRequest()
request.topic = subscribe_topic
handler = IpcPubSub.TopicSubscriber(self.message_callback, subscribe_topic, executor)
operation = self.ipc_subscribe_client.new_subscribe_to_topic(handler)
future = operation.activate(request)
future.result(self.ipc_timeout)
except concurrent.futures.TimeoutError as e: # pragma: no cover
log.error('TIMEOUT_ERROR: Timeout occurred while subscribing to IPC topic. ERROR MESSAGE: {} - TOPIC {}'.format(e, subscribe_topic))
except UnauthorizedError as e: # pragma: no cover
log.error('UNATHORIZED_ERROR: Unauthorized error while subscribing to IPC topic. ERROR MESSAGE: {} - TOPIC {}'.format(e, subscribe_topic))
except Exception as e: # pragma: no cover
log.error('EXCEPTION: Exception while subscribing to IPC topic. ERROR MESSAGE: {} - TOPIC {}'.format(e, subscribe_topic))