sample_app/cerebral_genai/code/rag-on-edge-simulator/app.py (111 lines of code) (raw):

from influxdb_client import InfluxDBClient, Point, WritePrecision from influxdb_client.client.write_api import SYNCHRONOUS from influxdb_client.client.query_api import QueryOptions import random import json import time from datetime import datetime import os # InfluxDB Settings INFLUXDB_URL = os.getenv("INFLUXDB_URL") INFLUXDB_TOKEN = os.getenv("INFLUXDB_TOKEN") INFLUXDB_ORG = os.getenv("INFLUXDB_ORG") INFLUXDB_BUCKET = os.getenv("INFLUXDB_BUCKET") VERBOSE = bool(os.getenv("VERBOSE", "False")) # Connect to InfluxDB client = InfluxDBClient(url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG) write_api = client.write_api(write_options=SYNCHRONOUS) query_api = client.query_api() def generate_data(): models = ["SUV", "Sedan", "Coupe"] colors = ["red", "blue", "black"] engine_types = ["electric", "hybrid", "gasoline"] assembly_statuses = ["InProgress", "Downtime"] shifts = ["morning", "afternoon", "night"] waste_reasons = ["defect", "rework", "spoilage"] lost_time_reasons = ["maintenance", "operator", "material"] data = { "plant": "Monterrey", "country": "Mexico", "assembly_line": "E-001", "car_id": "E-001", "model": random.choice(models), "color": random.choice(colors), "engine_type": random.choice(engine_types), "assembly_status": random.choice(assembly_statuses), "shift": random.choice(shifts), "Drive1_Voltage": random.uniform(190, 200), "Cooler_ON": random.choice([True, False]), "Fan001_On": random.choice([True, False]), "Heater_ON": random.choice([True, False]), "Pump1_Temperature_Flow": random.uniform(20, 30), "Pump2_Temperature_Flow": random.uniform(20, 30), "Pump3_Temperature_Flow": random.uniform(20, 30), "Pumps_Total_Flow": random.uniform(30, 35), "Pressure_Filter_Inlet": random.uniform(1, 2), "Pressure_Filter_Outlet": random.uniform(0.5, 1.5), "RobotPosition_J0": random.uniform(0, 5), "RobotPosition_J1": random.uniform(0, 5), "RobotPosition_J2": random.uniform(0, 5), "RobotPosition_J3": random.uniform(0, 5), "RobotPosition_J4": random.uniform(0, 5), "RobotPosition_J5": random.uniform(0, 5), "Tank_Level": random.uniform(60, 70), "Drive1_Current": random.uniform(0.4, 0.6), "Drive1_Frequency": 30, "Drive1_Speed": 30, "Drive1_Voltage": random.uniform(190, 200), "Drive2_Current": random.uniform(0.4, 0.6), "Drive2_Frequency": 30, "Drive2_Speed": 30, "Drive2_Voltage": random.uniform(190, 200), "Current": random.uniform(0, 0.01), "Voltage": random.uniform(100, 110), "Temperature": random.uniform(70, 80), "Humidity": random.uniform(40, 50), "VacuumAlert": random.choice([True, False]), "VacuumPressure": random.uniform(10, 20), "Oiltemperature": random.uniform(350, 390), "OiltemperatureTarget": 375, "Waste": random.uniform(1, 2), "WasteReason": random.choice(waste_reasons), "LostTime": "SPT", "LostTimeReason": random.choice(lost_time_reasons), "LostTimeTimeCount": random.randint(90, 100), "ScheduledBatteries": random.randint(8, 12), "CompletedBatteries": random.randint(3, 7), "ScheduledBatteriesPerHour": random.randint(280, 320), "Temperature": random.uniform(230, 240), "ImpactTest": random.uniform(780, 790), "VibrationTest": random.uniform(6, 7), "CellTest": random.uniform(400, 410), "DownTime": random.randint(2, 6), "Thruput": random.randint(28, 32), "OverallEfficiency": random.randint(88, 92), "Availability": random.randint(93, 97), "Performance": random.randint(93, 97), "Quality": random.randint(93, 97), "PlannedProductionTime": random.randint(58, 62), "ActualRuntime": random.randint(960, 965), "UnplannedDowntime": random.randint(295, 300), "PlannedDowntime": 0, "PlannedQuantity": random.randint(290, 310), "ActualQuantity": random.randint(280, 290), "RejectedQuantity": random.randint(10, 20), "OEE_GoalbyPlant": random.uniform(78, 82), "OEE_Mexico": random.uniform(95, 96), "OEE_BatteryA": random.uniform(96, 97), "OEE_BatteryB": random.uniform(94, 95), "OEE_BatteryC": random.uniform(95, 96) } return data def write_data_to_influxdb(measurement_name, timestamp): data = generate_data() #point = Point("factory").tag("plant", "Monterrey").tag("country", "Mexico").tag("assembly_line", "E-001").field("car_id", "E-001") point = Point(measurement_name).time(timestamp, WritePrecision.NS) for key, value in data.items(): point.field(key, value) write_api.write(bucket=INFLUXDB_BUCKET, record=point) if VERBOSE: print(f"Written data to InfluxDB: {data}") if __name__ == "__main__": while True: write_data_to_influxdb("assemblyline", datetime.utcnow().isoformat()) time.sleep(60)