in pipelines/iot_analytics/iot_analytics_pipeline/trigger_inference.py [0:0]
def process(self, element):
df = pd.DataFrame([element])
df["last_service_date"] = (
pd.to_datetime(df["last_service_date"]) -
pd.to_datetime(df["last_service_date"]).min()).dt.days
prediction = self.model.predict(
df[["max_temperature", "max_vibration", "last_service_date"]])
results = beam.Row(
vehicle_id=str(element["vehicle_id"]),
max_temperature=float(element["max_temperature"]),
max_vibration=float(element["max_vibration"]),
latest_timestamp=element["latest_timestamp"],
last_service_date=element["last_service_date"],
maintenance_type=element["maintenance_type"],
model=element["model"],
needs_maintenance=prediction[0])
yield results._asdict()