pipelines/iot_analytics/scripts/create_and_populate_bigtable.py (69 lines of code) (raw):

# Copyright 2025 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Pipeline of the IoT Analytics Dataflow Solution guide. """ # Create a bigtable and populate the weather data table from google.cloud.bigtable import column_family from google.cloud.bigtable import row from google.cloud.bigtable import Client from datetime import datetime import os import json # Create Bigtable Data (Weather data) and Load Records current_directory = os.getcwd() PROJECT_ID = os.environ.get("PROJECT_ID") INSTANCE_ID = os.environ.get("BIGTABLE_INSTANCE_ID") TABLE_ID = os.environ.get("BIGTABLE_TABLE_ID") MAINTENANCE_DATA_PATH = os.environ.get("MAINTENANCE_DATA_PATH") # Create a Bigtable client client = Client(project=PROJECT_ID, admin=True) instance = client.instance(INSTANCE_ID) # Create a column family. column_family_id = "maintenance" max_versions_rule = column_family.MaxVersionsGCRule(2) column_families = {column_family_id: max_versions_rule} # Create a table. table = instance.table(TABLE_ID) # You need admin access to use `.exists()`. If you don't have the admin access, then # comment out the if-else block. if not table.exists(): table.create(column_families=column_families) else: print(f"Table {TABLE_ID} already exists in {PROJECT_ID}:{INSTANCE_ID}") # Define column names for the table. vehicle_id = "vehicle_id" last_service_date = "last_service_date" maintenance_type = "maintenance_type" make = "make" model = "model" # Sample weather data maintenance_data = [] try: with open(MAINTENANCE_DATA_PATH, "r", encoding="utf-8") as f: for line in f: try: data = json.loads(line) maintenance_data.append(data) except json.JSONDecodeError as e: print(f"Error decoding JSON from line: {line.strip()}") print(f"Error message: {e}") # Handle the error (e.g., log it, skip the line, or raise an exception) except FileNotFoundError: print(f"File not found: {MAINTENANCE_DATA_PATH}") # Populate Bigtable for record in maintenance_data: row_key = str(record[vehicle_id]).encode() row = table.direct_row(row_key) row.set_cell( column_family_id, vehicle_id.encode(), str(record[vehicle_id]), timestamp=datetime.utcnow()) row.set_cell( column_family_id, last_service_date.encode(), str(record[last_service_date]), timestamp=datetime.utcnow()) row.set_cell( column_family_id, maintenance_type.encode(), str(record[maintenance_type]), timestamp=datetime.utcnow()) row.set_cell( column_family_id, make.encode(), str(record[make]), timestamp=datetime.utcnow()) row.set_cell( column_family_id, model.encode(), str(record[model]), timestamp=datetime.utcnow()) row.commit() print(f"Inserted row for key: {record[vehicle_id]}") print("Bigtable populated with sample weather information.")