courses/understanding_spanner/cloud-run/main.py (123 lines of code) (raw):
# Copyright (C) 2023 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from google.cloud import spanner
from flask import Flask, request
from flask_restful import Resource, Api
from flask import jsonify
import uuid
import os
# Get the Instance ID and database ID from environment variables
# Use the emulator if the env variables are not set.
# Requires the start-spanner-emulator.sh script be run.
if "INSTANCE_ID" in os.environ:
instance_id = os.environ["INSTANCE_ID"]
else:
instance_id = 'emulator-instance'
if "DATABASE_ID" in os.environ:
database_id = os.environ["DATABASE_ID"]
else:
database_id = 'pets-db'
client = spanner.Client()
instance = client.instance(instance_id)
database = instance.database(database_id)
app = Flask(__name__)
api = Api(app)
class PetsList(Resource):
def get(self):
query = """SELECT Owners.OwnerID, OwnerName,
PetID, PetName, PetType, Breed
FROM Owners
JOIN Pets ON Owners.OwnerID = Pets.OwnerID;"""
pets = []
with database.snapshot() as snapshot:
results = snapshot.execute_sql(query)
for row in results:
pet = {'OwnerID': row[0],
'OwnerName': row[1],
'PetID': row[2],
'PetName': row[3],
'PetType': row[4],
'Breed': row[5], }
pets.append(pet)
return pets, 200
def post(self):
# Check to see if the Owner already exists
data = request.get_json(True)
with database.snapshot() as snapshot:
results = snapshot.execute_sql("""
SELECT OwnerID FROM OWNERS
WHERE OwnerName = @owner_name""",
params={"owner_name": data["OwnerName"]},
param_types={"owner_name": spanner.param_types.STRING})
row = results.one_or_none()
if row != None:
owner_exists = True
owner_id = row[0]
else:
owner_exists = False
owner_id = str(uuid.uuid4())
# Need a UUID for the new pet
pet_id = str(uuid.uuid4())
def insert_owner_pet(transaction, data, owner_exists):
try:
row_ct = 0
params = {"owner_id": owner_id,
"owner_name": data["OwnerName"],
"pet_id": pet_id,
"pet_name": data["PetName"],
"pet_type": data["PetType"],
"breed": data["Breed"],
}
param_types = { "owner_id": spanner.param_types.STRING,
"owner_name": spanner.param_types.STRING,
"pet_id": spanner.param_types.STRING,
"pet_name": spanner.param_types.STRING,
"pet_type": spanner.param_types.STRING,
"breed": spanner.param_types.STRING,
}
# Only add the Owner if they don't exist already
if not owner_exists:
row_ct = transaction.execute_update(
"""INSERT Owners (OwnerID, OwnerName)
VALUES (@owner_id, @owner_name)""",
params=params,
param_types=param_types,)
# Add the pet
row_ct += transaction.execute_update(
"""INSERT Pets (PetID, OwnerID, PetName, PetType, Breed)
VALUES (@pet_id, @owner_id, @pet_name, @pet_type, @breed)""",
params=params,
param_types=param_types,)
except:
row_ct = 0
return row_ct
row_ct = database.run_in_transaction(insert_owner_pet, data, owner_exists)
return "{} record(s) inserted.".format(row_ct), 201
def delete(self):
# This delete all the Owners and Pets
# Uses Cascading Delete on interleaved Pets table
def delete_owners(transaction):
row_ct = transaction.execute_update(
"DELETE FROM Owners WHERE true = true")
return row_ct
row_ct = database.run_in_transaction(delete_owners)
return "{} record(s) deleted.".format(row_ct), 201
class Pet(Resource):
def get(self, pet_id):
params = {"pet_id": pet_id}
param_types = {
"pet_id": spanner.param_types.STRING,
}
query = """SELECT Owners.OwnerID, OwnerName,
PetID, PetName, PetType, Breed
FROM Owners
JOIN Pets ON Owners.OwnerID = Pets.OwnerID
WHERE PetID = @pet_id; """
with database.snapshot() as snapshot:
results = snapshot.execute_sql(
query, params=params, param_types=param_types,)
pet = None
for row in results:
pet = {'OwnerID': row[0],
'OwnerName': row[1],
'PetID': row[2],
'PetName': row[3],
'PetType': row[4],
'Breed': row[5], }
if pet:
return pet, 200
else:
return "Not found", 404
def patch(self, pet_id):
# This woud be for an Update
return "Not Implemented", 500
def delete(self, pet_id):
# This woud be for a Delete
return "Not Implemented", 500
api.add_resource(PetsList, '/pets')
api.add_resource(Pet, '/pets/<pet_id>')
@app.errorhandler(404)
def not_found(error):
return jsonify({'error': 'Not found'}), 404
@app.errorhandler(500)
def server_error(e):
return 'An internal error occurred.', 500
if __name__ == "__main__":
app.run(host='0.0.0.0', port=8080, debug=True)