in agora/cerebral_simulator/src/store_simulator.py [0:0]
def run(self):
try:
self.logger.info("Starting store simulator...")
self.load_products()
self.logger.info(f"Loaded {len(self.products_list)} products")
# Calculate start time for historical data
historical_start = datetime.now() + timedelta(days=self.HISTORICAL_DATA_DAYS)
# Start historical processing in a separate thread if enabled
historical_thread = None
if self.ENABLE_HISTORICAL:
self.logger.info("Starting historical data processing...")
historical_thread = Thread(
target=self.run_historical_data,
args=(historical_start,),
daemon=True
)
historical_thread.start()
last_sql_save = datetime.now()
# Generate live data
self.logger.info("Starting live data generation...")
while True:
current_time = datetime.now()
try:
# Generate and send data to MQTT continuously
self.simulate_order_data(current_time, destination="MQTT", save_to_sql=False)
# Every minute, save to SQL
if (current_time - last_sql_save).seconds >= 60:
self.logger.info(f'Saving batch to SQL at: {current_time}')
self.simulate_order_data(current_time, destination="MQTT", save_to_sql=True)
last_sql_save = current_time
# Small sleep to prevent overwhelming the system
time.sleep(random.uniform(8, 12))
except Exception as e:
self.logger.error(f"Error generating live data: {str(e)}")
time.sleep(5) # Wait a bit on error
except KeyboardInterrupt:
self.logger.info("Received keyboard interrupt, shutting down...")
except Exception as e:
self.logger.error(f"Error in store simulator: {str(e)}")
finally:
if historical_thread and historical_thread.is_alive():
self.logger.info("Waiting for historical processing to complete...")
historical_thread.join(timeout=5)
self.cleanup()