def consume()

in src/dispatch/plugins/dispatch_aws/plugin.py [0:0]


    def consume(self, db_session: Session, project: Project) -> None:
        client = boto3.client("sqs", region_name=self.configuration.region)
        queue_url: str = client.get_queue_url(
            QueueName=self.configuration.queue_name,
            QueueOwnerAWSAccountId=self.configuration.queue_owner,
        )["QueueUrl"]

        while True:
            response = client.receive_message(
                QueueUrl=queue_url,
                MaxNumberOfMessages=self.configuration.batch_size,
                VisibilityTimeout=40,
                WaitTimeSeconds=20,
            )
            if not response.get("Messages") or len(response["Messages"]) == 0:
                log.info("No messages received from SQS.")
                continue

            entries: list[SqsEntries] = []
            for message in response["Messages"]:
                try:
                    message_body = json.loads(message["Body"])
                    message_body_message = message_body.get("Message")
                    message_attributes = message_body.get("MessageAttributes", {})

                    if message_attributes.get("compressed", {}).get("Value") == "zlib":
                        # Message is compressed, decompress it
                        message_body_message = decompress_json(message_body_message)

                    signal_data = json.loads(message_body_message)
                except Exception as e:
                    log.exception(f"Unable to extract signal data from SQS message: {e}")
                    continue

                try:
                    signal_instance_in = SignalInstanceCreate(
                        project=project, raw=signal_data, **signal_data
                    )
                except ValidationError as e:
                    log.warning(
                        f"Received a signal instance that does not conform to the SignalInstanceCreate pydantic model. Skipping creation: {e}"
                    )
                    continue

                # if the signal has an existing uuid we check if it already exists
                if signal_instance_in.raw and signal_instance_in.raw.get("id"):
                    if signal_service.get_signal_instance(
                        db_session=db_session, signal_instance_id=signal_instance_in.raw["id"]
                    ):
                        log.info(
                            f"Received a signal that already exists in the database. Skipping signal instance creation: {signal_instance_in.raw['id']}"
                        )
                        continue

                try:
                    with db_session.begin_nested():
                        signal_instance = signal_service.create_signal_instance(
                            db_session=db_session,
                            signal_instance_in=signal_instance_in,
                        )
                except IntegrityError as e:
                    if isinstance(e.orig, UniqueViolation):
                        log.info(
                            f"Received a signal that already exists in the database. Skipping signal instance creation: {e}"
                        )
                    else:
                        log.exception(
                            f"Encountered an integrity error when trying to create a signal instance: {e}"
                        )
                    continue
                except ResourceClosedError as e:
                    log.warning(
                        f"Encountered an error when trying to create a signal instance. The plugin will retry again as the message hasn't been deleted from the SQS queue. Signal name/variant: {signal_instance_in.raw['name'] if signal_instance_in.raw and signal_instance_in.raw['name'] else signal_instance_in.raw['variant']}. Error: {e}"
                    )
                    db_session.rollback()
                    continue
                except Exception as e:
                    log.exception(
                        f"Encountered an error when trying to create a signal instance. Signal name/variant: {signal_instance_in.raw['name'] if signal_instance_in.raw and signal_instance_in.raw['name'] else signal_instance_in.raw['variant']}. Error: {e}"
                    )
                    db_session.rollback()
                    continue
                else:
                    metrics_provider.counter(
                        "aws-sqs-signal-consumer.signal.received",
                        tags={
                            "signalName": signal_instance.signal.name,
                            "externalId": signal_instance.signal.external_id,
                        },
                    )
                    log.debug(
                        f"Received a signal with name {signal_instance.signal.name} and id {signal_instance.signal.id}"
                    )
                    entries.append(
                        {"Id": message["MessageId"], "ReceiptHandle": message["ReceiptHandle"]}
                    )

            if entries:
                client.delete_message_batch(QueueUrl=queue_url, Entries=entries)