in agora/cerebral_simulator/src/store_simulator.py [0:0]
def publish_data_to_mqtt(self, simulation_data, topic):
if self.mqtt_client is None:
self.logger.error("MQTT client is None, attempting reconnection")
try:
self.init_mqtt()
except:
return
try:
staging_event = json.dumps({
"source": "simulator",
"subject": topic,
"event_data": json.loads(simulation_data)
})
# mqtt_topic = "topic/commercial"
result = self.mqtt_client.publish(topic, staging_event)
if result.rc != 0:
self.logger.error(f"Failed to publish to MQTT: {result.rc}")
return
if os.getenv("VERBOSE", "False").lower() == "true":
self.logger.info(f"Data published to MQTT topic {topic}: {staging_event}")
# Save to SQL Server
if self.ENABLE_SQL and self.sql_conn:
try:
event_data = json.loads(simulation_data)
if topic == "topic/inventory":
self.save_to_sql("inventory", event_data)
elif topic == "topic/sales":
self.save_to_sql("sales", event_data)
except Exception as e:
self.logger.error(f"Error saving to SQL: {str(e)}")
if os.getenv("VERBOSE", "False").lower() == "true":
self.logger.info(f"Data for {topic} published to MQTT: {staging_event}")
except Exception as e:
self.logger.error(f"Error publishing to MQTT: {str(e)}")
# Try to reconnect
try:
self.init_mqtt()
except:
pass