granule_ingester/granule_ingester/consumer/MessageConsumer.py (87 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging import aio_pika from granule_ingester.exceptions import PipelineBuildingError, PipelineRunningError, RabbitMQLostConnectionError, \ RabbitMQFailedHealthCheckError, LostConnectionError from granule_ingester.healthcheck import HealthCheck from granule_ingester.pipeline import Pipeline logger = logging.getLogger(__name__) class MessageConsumer(HealthCheck): def __init__(self, rabbitmq_host, rabbitmq_username, rabbitmq_password, rabbitmq_queue, data_store_factory, metadata_store_factory, log_level=logging.INFO): self._rabbitmq_queue = rabbitmq_queue self._data_store_factory = data_store_factory self._metadata_store_factory = metadata_store_factory self._connection_string = "amqp://{username}:{password}@{host}/".format(username=rabbitmq_username, password=rabbitmq_password, host=rabbitmq_host) self._connection: aio_pika.Connection = None self._level = log_level async def health_check(self) -> bool: try: connection = await self._get_connection() await connection.close() return True except Exception: raise RabbitMQFailedHealthCheckError(f"Cannot connect to RabbitMQ! " f"Connection string was {self._connection_string}") async def _get_connection(self) -> aio_pika.Connection: return await aio_pika.connect_robust(self._connection_string) async def __aenter__(self): self._connection = await self._get_connection() return self async def __aexit__(self, exc_type, exc_val, exc_tb): if self._connection: await self._connection.close() @staticmethod async def _received_message(message: aio_pika.IncomingMessage, data_store_factory, metadata_store_factory, pipeline_max_concurrency: int, log_level=logging.INFO): logger.info("Received a job from the queue. Starting pipeline.") try: config_str = message.body.decode("utf-8") logger.debug(config_str) pipeline = Pipeline.from_string(config_str=config_str, data_store_factory=data_store_factory, metadata_store_factory=metadata_store_factory, max_concurrency=pipeline_max_concurrency) pipeline.set_log_level(log_level) await pipeline.run() await message.ack() except PipelineBuildingError as e: await message.reject() logger.exception(f"Failed to build the granule-processing pipeline. This message will be dropped " f"from RabbitMQ. The exception was:\n{e}") except PipelineRunningError as e: await message.reject() logger.exception(f"Processing the granule failed. It will not be retried. The exception was:\n{e}") except LostConnectionError: # Let main() handle this raise except Exception as e: await message.reject(requeue=True) logger.exception(f"Processing message failed. Message will be re-queued. The exception was:\n{e}") async def start_consuming(self, pipeline_max_concurrency=16): channel = await self._connection.channel() await channel.set_qos(prefetch_count=1) queue = await channel.declare_queue(self._rabbitmq_queue, durable=True, arguments={'x-max-priority': 10}) queue_iter = queue.iterator() async for message in queue_iter: try: await self._received_message(message, self._data_store_factory, self._metadata_store_factory, pipeline_max_concurrency, self._level) except aio_pika.exceptions.MessageProcessError: # Do not try to close() the queue iterator! If we get here, that means the RabbitMQ # connection has died, and attempting to close the queue will only raise another exception. raise RabbitMQLostConnectionError("Lost connection to RabbitMQ while processing a granule.") except Exception as e: await queue_iter.close() await channel.close() raise e