other/train-to-cloud-city/app/functions/cargo_trigger/main.py (53 lines of code) (raw):
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from cloudevents.http import CloudEvent
import functions_framework
from google.cloud import firestore
from google.events.cloud import firestore as firestoredata
import train_types
client = firestore.Client()
# Update session state (Firestore docs) based on changes to train cargo
# and user selected pattern.
# This function is triggered on any change to "global*/cargo" docs.
@functions_framework.cloud_event
def cargo_trigger(cloud_event: CloudEvent) -> None:
# print(cloud_event)
firestore_payload = firestoredata.DocumentEventData()
firestore_payload._pb.ParseFromString(cloud_event.data)
print(firestoredata.DocumentEventData.to_json(firestore_payload).replace("\n", ""))
# extract the actual data from protobuf nonsense
old_cargo = firestore_payload.old_value.fields["actual_cargo"].array_value
# print(old_cargo)
old_cargo = [item.string_value for item in old_cargo.values]
# print(f"Old value: {repr(old_cargo)}")
new_cargo = firestore_payload.value.fields["actual_cargo"].array_value
# print(new_cargo)
new_cargo = [item.string_value for item in new_cargo.values]
print(f"cargo/actual_cargo: {repr(old_cargo)} --> {repr(new_cargo)}\n")
if old_cargo == new_cargo:
print("No change in cargo, exiting")
return
path_parts = firestore_payload.value.name.split("/")
separator_idx = path_parts.index("documents")
collection_path = path_parts[separator_idx + 1]
document_path = "proposal"
print(f"Collection path: {collection_path}, Document path: {document_path}\n")
# print(f"Document path: {document_path}")
collection = client.collection(collection_path)
proposal_doc = collection.document(document_path)
if old_cargo != new_cargo:
# print("Updating proposal_result based on cargo change")
pattern_slug = proposal_doc.get().to_dict()["pattern_slug"]
if not pattern_slug:
proposal_doc.update({"proposal_result" : None})
print("No proposal/pattern_slug found, clearing proposal_result, finished.")
return
# TODO: Consider getting the patterns from firestore instead?
pattern = train_types.PATTERNS[pattern_slug]
pr = train_types.ProposalResult(pattern = pattern, checkpoint_results=[])
pr.validate(service_slugs=new_cargo)
proposal_result = {"proposal_result" : pr.model_dump(mode='json')}
print(json.dumps(proposal_result))
proposal_doc.update(proposal_result)
# Update signals (stop/go lights) based on pattern_result
signaldoc = collection.document("signals")
# Iterate over the first four checkpoints (we have exactly four signals) and
# update each signal in order.
for index, cp_result in enumerate(pr.checkpoint_results[:3]):
match cp_result.clear:
case True:
target_state = train_types.SIGNAL_STATE["CLEAR"]
case _:
target_state = train_types.SIGNAL_STATE["STOP"]
signal_slug = train_types.SIGNALS[index].slug
key = f"{signal_slug}.target_state"
signaldoc.update({key: target_state})
# If all checkpoints clear, tell the train to go!
if pr.clear:
collection.document("train_mailbox").update({"input" : "do_victory_lap"})
else:
# Don't perform a second write (which can trigger an infinite loop)
print("No change in cargo, exiting")