in src/dispatch/plugins/dispatch_aws/plugin.py [0:0]
def consume(self, db_session: Session, project: Project) -> None:
client = boto3.client("sqs", region_name=self.configuration.region)
queue_url: str = client.get_queue_url(
QueueName=self.configuration.queue_name,
QueueOwnerAWSAccountId=self.configuration.queue_owner,
)["QueueUrl"]
while True:
response = client.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=self.configuration.batch_size,
VisibilityTimeout=40,
WaitTimeSeconds=20,
)
if not response.get("Messages") or len(response["Messages"]) == 0:
log.info("No messages received from SQS.")
continue
entries: list[SqsEntries] = []
for message in response["Messages"]:
try:
message_body = json.loads(message["Body"])
message_body_message = message_body.get("Message")
message_attributes = message_body.get("MessageAttributes", {})
if message_attributes.get("compressed", {}).get("Value") == "zlib":
# Message is compressed, decompress it
message_body_message = decompress_json(message_body_message)
signal_data = json.loads(message_body_message)
except Exception as e:
log.exception(f"Unable to extract signal data from SQS message: {e}")
continue
try:
signal_instance_in = SignalInstanceCreate(
project=project, raw=signal_data, **signal_data
)
except ValidationError as e:
log.warning(
f"Received a signal instance that does not conform to the SignalInstanceCreate pydantic model. Skipping creation: {e}"
)
continue
# if the signal has an existing uuid we check if it already exists
if signal_instance_in.raw and signal_instance_in.raw.get("id"):
if signal_service.get_signal_instance(
db_session=db_session, signal_instance_id=signal_instance_in.raw["id"]
):
log.info(
f"Received a signal that already exists in the database. Skipping signal instance creation: {signal_instance_in.raw['id']}"
)
continue
try:
with db_session.begin_nested():
signal_instance = signal_service.create_signal_instance(
db_session=db_session,
signal_instance_in=signal_instance_in,
)
except IntegrityError as e:
if isinstance(e.orig, UniqueViolation):
log.info(
f"Received a signal that already exists in the database. Skipping signal instance creation: {e}"
)
else:
log.exception(
f"Encountered an integrity error when trying to create a signal instance: {e}"
)
continue
except ResourceClosedError as e:
log.warning(
f"Encountered an error when trying to create a signal instance. The plugin will retry again as the message hasn't been deleted from the SQS queue. Signal name/variant: {signal_instance_in.raw['name'] if signal_instance_in.raw and signal_instance_in.raw['name'] else signal_instance_in.raw['variant']}. Error: {e}"
)
db_session.rollback()
continue
except Exception as e:
log.exception(
f"Encountered an error when trying to create a signal instance. Signal name/variant: {signal_instance_in.raw['name'] if signal_instance_in.raw and signal_instance_in.raw['name'] else signal_instance_in.raw['variant']}. Error: {e}"
)
db_session.rollback()
continue
else:
metrics_provider.counter(
"aws-sqs-signal-consumer.signal.received",
tags={
"signalName": signal_instance.signal.name,
"externalId": signal_instance.signal.external_id,
},
)
log.debug(
f"Received a signal with name {signal_instance.signal.name} and id {signal_instance.signal.id}"
)
entries.append(
{"Id": message["MessageId"], "ReceiptHandle": message["ReceiptHandle"]}
)
if entries:
client.delete_message_batch(QueueUrl=queue_url, Entries=entries)