in agora/cerebral_simulator/src/store_simulator.py [0:0]
def simulate_order_data(self, current_time, destination="MQTT", save_to_sql=False):
"""
Simulate order data for the given time
Args:
current_time: DateTime to generate orders for
destination: Where to send the data ("MQTT" or "EventHub")
save_to_sql: Whether to save the data to SQL (default: False)
"""
try:
# Calculate number of orders based on day and time
day_name = current_time.strftime('%A')
randomOrders = self.calculate_random_orders(current_time, day_name)
# Generate all orders
all_orders = []
for _ in range(randomOrders):
store = random.choice(self.store_details)
order_items = self.generate_order(current_time, store, self.products_list)
if order_items:
all_orders.extend(order_items)
# Store in SQL if enabled and requested
if save_to_sql and self.ENABLE_SQL and all_orders:
sql_orders = random.sample(all_orders, min(5, len(all_orders)))
for order in sql_orders:
self.save_to_sql("sales", order)
self.logger.info(f"Saved {len(sql_orders)} orders to SQL")
# Send all orders to MQTT/EventHub
for order in all_orders:
# Add to recent orders memory
self.add_order({
"timestamp": order['sale_date'],
"order_data": order
})
# Send to appropriate destination
simulation_data = json.dumps(order)
if destination == "EventHub":
self.send_orders_to_event_hub(simulation_data)
else:
self.logger.info(f"Publishing order to MQTT topic/sales: {order['sale_id']}")
self.publish_data_to_mqtt(simulation_data, "topic/sales")
except Exception as e:
self.logger.error(f"Error in simulate_order_data: {str(e)}")