migrations/versions/5297e76ff06b_initial_data.py (78 lines of code) (raw):

"""initial data Revision ID: 5297e76ff06b Revises: e157ee29908d Create Date: 2019-12-13 12:50:34.230207 """ import os import subprocess from alembic import op import json from dateutil import parser as dateutil_parser import sqlalchemy as sa from sqlalchemy.sql import table, column # revision identifiers, used by Alembic. revision = '5297e76ff06b' down_revision = 'e157ee29908d' branch_labels = None depends_on = None customers_table = table('opbeans_flask_customer', column('id', sa.Integer), column("full_name", sa.String(1000)), column("company_name", sa.String(1000)), column("email", sa.String(1000)), column("address", sa.String(1000)), column("postal_code", sa.String(1000)), column("city", sa.String(1000)), column("country", sa.String(1000)), ) orders_table = table("opbeans_flask_order", column("id", sa.Integer), column("customer_id", sa.Integer), column("created_at", sa.DateTime), ) product_types_table = table("opbeans_flask_producttype", column("id", sa.Integer), column("name", sa.String(1000)), ) products_table = table("opbeans_flask_product", column("id", sa.Integer), column("sku", sa.String(1000)), column("name", sa.String(1000)), column("description", sa.Text), column("product_type_id", sa.Integer), column("stock", sa.Integer), column("cost", sa.Integer), column("selling_price", sa.Integer), ) order_lines_table = table("opbeans_flask_orderline", column("product_id", sa.Integer,), column("order_id", sa.Integer), column("amount", sa.Integer), ) def upgrade(): bulks = { "order": (orders_table, []), "customer": (customers_table, []), "product": (products_table, []), "producttype": (product_types_table, []), "orderline": (order_lines_table, []), } renames = { "order": "order_id", "customer": "customer_id", "product_type": "product_type_id", "product": "product_id", } file = os.path.join(os.path.dirname(os.path.abspath(__file__)), "initial_data.json") subprocess.check_call(["bunzip2", "-k", file + ".bz2"]) with open(file) as f: data = json.load(f) for item in data: key = item["model"].split(".")[1] fields = {"id": item["pk"]} fields.update(item["fields"]) for rename, to in renames.items(): if rename in fields: fields[to] = fields.pop(rename) if "created_at" in fields: fields["created_at"] = dateutil_parser.parse(fields["created_at"]) bulks[key][1].append(fields) for _, (t, items) in bulks.items(): op.bulk_insert(t, items) os.unlink(file) def downgrade(): pass