sql_implementation/__init__.py (133 lines of code) (raw):
from flask import Blueprint, abort, jsonify, current_app
from sqlalchemy.orm import joinedload
from sqlalchemy.sql import func
from .models import Product, ProductType, Customer, Order, OrderLine
sql = Blueprint('sql', __name__)
@sql.route('/api/products')
def products():
product_list = Product.query.order_by("id").all()
data = []
for p in product_list:
data.append({
'id': p.id,
'sku': p.sku,
'name': p.name,
'stock': p.stock,
'type_name': p.product_type.name
})
return jsonify(data)
@sql.route('/api/products/top')
def top_products():
db = current_app.config["sqldb"]
sold_amount = func.sum(OrderLine.amount).label('sold')
product_list = db.session.query(
Product.id,
Product.sku,
Product.name,
Product.stock,
sold_amount
).outerjoin(OrderLine).group_by(Product.id).order_by(sold_amount.desc()).limit(3)
return jsonify([{
'id': p.id,
'sku': p.sku,
'name': p.name,
'stock': p.stock,
'sold': p.sold,
} for p in product_list])
@sql.route('/api/products/<int:pk>')
def product(pk):
product = Product.query.options(joinedload(Product.product_type)).get(pk)
if not product:
abort(404)
return jsonify({
"id": product.id,
"sku": product.sku,
"name": product.name,
"description": product.description,
"stock": product.stock,
"cost": product.cost,
"selling_price": product.selling_price,
"type_id": product.product_type_id,
"type_name": product.product_type.name
})
@sql.route("/api/products/<int:pk>/customers")
def product_customers(pk):
customers = Customer.query.join(Order).join(OrderLine).join(Product).filter(Product.id == pk).order_by(
Customer.id).all()
return jsonify([{
"id": cust.id,
"full_name": cust.full_name,
"company_name": cust.company_name,
"email": cust.email,
"address": cust.address,
"postal_code": cust.postal_code,
"city": cust.city,
"country": cust.country,
} for cust in customers])
@sql.route('/api/types')
def product_types():
types_list = ProductType.query.all()
data = []
for t in types_list:
data.append({
'id': t.id,
'name': t.name,
})
return jsonify(data)
@sql.route('/api/types/<int:pk>')
def product_type(pk):
product_type = ProductType.query.filter_by(id=pk)[0]
products = Product.query.filter_by(product_type=product_type)
return jsonify({
"id": product_type.id,
"name": product_type.name,
"products": [{
"id": product.id,
"name": product.name,
} for product in products]
})
@sql.route("/api/customers")
def customers():
customers = Customer.query.all()
data = []
for customer in customers:
data.append({
"id": customer.id,
"full_name": customer.full_name,
"company_name": customer.company_name,
"email": customer.email,
"address": customer.address,
"postal_code": customer.postal_code,
"city": customer.city,
"country": customer.country,
})
return jsonify(data)
@sql.route("/api/customers/<int:pk>")
def customer(pk):
try:
customer_obj = Customer.query.filter_by(id=pk)[0]
except IndexError:
current_app.logger.warning('Customer with ID %s not found', pk, exc_info=True)
abort(404)
return jsonify({
"id": customer_obj.id,
"full_name": customer_obj.full_name,
"company_name": customer_obj.company_name,
"email": customer_obj.email,
"address": customer_obj.address,
"postal_code": customer_obj.postal_code,
"city": customer_obj.city,
"country": customer_obj.country,
})
@sql.route("/api/stats")
def stats():
return jsonify({
"products": Product.query.count(),
"customers": Customer.query.count(),
"orders": Order.query.count(),
"numbers": {
"revenue": 0,
"cost": 0,
"profit": 0,
}
})