def process()

in pipelines/iot_analytics/iot_analytics_pipeline/trigger_inference.py [0:0]


  def process(self, element):
    df = pd.DataFrame([element])
    df["last_service_date"] = (
        pd.to_datetime(df["last_service_date"]) -
        pd.to_datetime(df["last_service_date"]).min()).dt.days
    prediction = self.model.predict(
        df[["max_temperature", "max_vibration", "last_service_date"]])
    results = beam.Row(
        vehicle_id=str(element["vehicle_id"]),
        max_temperature=float(element["max_temperature"]),
        max_vibration=float(element["max_vibration"]),
        latest_timestamp=element["latest_timestamp"],
        last_service_date=element["last_service_date"],
        maintenance_type=element["maintenance_type"],
        model=element["model"],
        needs_maintenance=prediction[0])
    yield results._asdict()