pipelines/iot_analytics/scripts/create_data.py (41 lines of code) (raw):
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Pipeline of the IoT Analytics Dataflow Solution guide.
"""
import random
import datetime
import pandas as pd
import os
# Get Env variables
current_directory = os.getcwd()
VEHICLE_DATA_PATH = os.environ.get("VEHICLE_DATA_PATH")
MAINTENANCE_DATA_PATH = os.environ.get("MAINTENANCE_DATA_PATH")
# Function to generate random vehicle data
def generate_vehicle_data(vehicle_id):
return {
"vehicle_id": vehicle_id,
"timestamp": datetime.datetime.now().isoformat(timespec="seconds") + "Z",
"temperature": random.randint(65, 85),
"rpm": random.randint(1500, 3500),
"vibration": round(random.uniform(0.1, 0.5), 2),
"fuel_level": random.randint(50, 90),
"mileage": random.randint(40000, 60000)
}
# Function to generate random maintenance data
def generate_maintenance_data(vehicle_id):
return {
"vehicle_id":
vehicle_id,
"last_service_date": (datetime.datetime.now() - datetime.timedelta(
days=random.randint(30, 365))).strftime("%Y-%m-%d"),
"maintenance_type":
random.choice([
"oil_change", "tire_rotation", "brake_check", "filter_replacement"
]),
"make":
"Ford",
"model":
"F-150"
}
# Generate 10 unique vehicle IDs
vehicle_ids = [str(i) for i in range(1000, 1010)]
# Create vehicle data and maintenance data lists
vehicle_data = [generate_vehicle_data(vehicle_id) for vehicle_id in vehicle_ids]
maintenance_data = [
generate_maintenance_data(vehicle_id) for vehicle_id in vehicle_ids
]
# Convert lists to Pandas DataFrames
df_vehicle_data = pd.DataFrame(vehicle_data)
df_maintenance_data = pd.DataFrame(maintenance_data)
df_vehicle_data.to_json(VEHICLE_DATA_PATH, orient="records", lines=True)
df_maintenance_data.to_json(MAINTENANCE_DATA_PATH, orient="records", lines=True)