data-analytics/data-dash/pi/beam_break_sensor.py (48 lines of code) (raw):

#!/usr/bin/python # Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import time import RPi.GPIO as GPIO class BeamBreakSensor: broken_time = 0 unbroken_time = 0 is_broken = False rowkey = None COLUMN_FAMILY = "cf" SENSOR_READ_INTERVAL = 10 def __init__(self, id, pin, table=None, meta_table=None): self.init_time = time.time() self.id = id self.pin = pin self.table = table self.meta_table = meta_table self.init_pin() def init_pin(self): GPIO.setup(self.pin, GPIO.IN, pull_up_down=GPIO.PUD_UP) # write initial sensor value to meta table rowkey = f"sensors#{self.init_time}" row = self.meta_table.direct_row(rowkey) gpio_input = str(GPIO.input(self.pin)) row.set_cell(self.COLUMN_FAMILY, str(self.pin), gpio_input) print(f"initializing sensor {self.id} (pin {self.pin}) with value {gpio_input}") row.commit() def read(self, rowkey): self.rowkey = rowkey _read = GPIO.input(self.pin) _time = time.time() self.unbroken(_time) if _read else self.broken(_time) def broken(self, cur_time): # Return if state is already broken. if self.is_broken: return print(f"SENSOR {self.id}: BROKEN") self.is_broken = True if cur_time - self.broken_time > self.SENSOR_READ_INTERVAL: self.broken_time = cur_time self.upload(cur_time, True) def unbroken(self, cur_time): self.unbroken_time = cur_time if self.is_broken: self.upload(cur_time, False) print(f"SENSOR {self.id}: UNBROKEN") self.is_broken = False def upload(self, cur_time, broken): col = f"t{self.id}_s" if broken else f"t{self.id}_e" row = self.table.direct_row(self.rowkey) row.set_cell(self.COLUMN_FAMILY, col, str(cur_time)) row.commit()