data-analytics/data-dash/pi/run.py (56 lines of code) (raw):

#!/usr/bin/python # Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os import time from google.cloud import bigtable from mfrc522 import SimpleMFRC522 import RPi.GPIO as GPIO from beam_break_sensor import BeamBreakSensor from side_controller import SideController # Pins as mapped to the Pis PINS = [i for i in range(19, 27)] # Subtract current_time from this value to get rows ordered last to first MAX_VALUE = 9999999999 # Read RFID every N seconds RFID_READ_INTERVAL = 3 # Load args project_id = os.environ["PROJECT_ID"] instance_id = "data-dash" table_id = "races" table_id_meta = "metadata" # Bigtable objects client = bigtable.Client(project=project_id) instance = client.instance(instance_id) table = instance.table(table_id) meta_table = instance.table(table_id_meta) try: # Initialize Pi connections to read as BCM GPIO.setmode(GPIO.BCM) # Initialize sensors sensors = [BeamBreakSensor(i, pin, table, meta_table) for i, pin in enumerate(PINS, start=1)] # Initialize RFID sensor reader = SimpleMFRC522() # SideController controls if this Pi should side_controller = SideController(meta_table) # Column mappings of side_controller car_side_map = { side_controller.LEFT_CONST: "car1", side_controller.RIGHT_CONST: "car2", } # Initialize the "side" of the webapp this Pi streams side = side_controller.get_side() id = None rowkey = None last_scan = 0 last_seen = 0 print(side) print(f"STARTING SIDE: {car_side_map[side]}") while True: _time = time.time() # Only accept RFID reads after a certain period if _time - last_scan > RFID_READ_INTERVAL: id = reader.read_id_no_block() last_scan = _time id_is_car = id and not side_controller.is_side(id) # todo: When does side get reset? – (a: side gets reset in is_side cmd) if id_is_car: side = side_controller.get_side() rowkey = f"track{side+1}#{MAX_VALUE - _time}" print(f"starting race for {rowkey}") row = table.direct_row(rowkey) row.set_cell("cf", "car_id", str(id)) row.commit() # Poll sensors if rowkey: for sensor in sensors: try: sensor.read(rowkey) except Exception as e: print(f"Error reading sensor {sensor.id}") finally: GPIO.cleanup()