in src/ab/plugins/db/db_conn_pool.py [0:0]
def create_task_table_and_mapper(engine):
from sqlalchemy import MetaData, Table, Column, Integer, String, DateTime, Text, text
from sqlalchemy.sql import func
meta = MetaData()
server_now = func.now()
text_class = Text
if engine.name == 'sqlite':
server_now = text("(DATETIME(CURRENT_TIMESTAMP,'LOCALTIME'))")
elif engine.name == 'mysql':
from sqlalchemy.dialects.mysql import LONGTEXT
text_class = LONGTEXT
task_table = Table(
'task', meta,
# Background on SQLite’s autoincrement is at: http://sqlite.org/autoinc.html
# if want non-dup pk, use "sqlite_autoincrement=True"
Column('id', Integer, primary_key=True),
Column('task_id', String(255), unique=True),
Column('app_name', String(255)),
Column('algorithm_name', String(255)),
Column('code', Integer),
Column('args', text_class),
Column('status', text_class),
Column('data', text_class),
Column('spark_app_id', String(255)),
Column('log', text_class),
Column('gmt_create', DateTime(timezone=True), server_default=server_now),
Column('gmt_modified', DateTime(timezone=True)),
)
# checkfirst=True by default, will skip create the table if already exists
meta.create_all(engine)
# TODO: isolate create_mapper
from ab.plugins.db.dao import Mapper
from ab.plugins.db import db_master
mapper = Mapper('task', json_columns=['args', 'status', 'data'], primary_key='task_id')
db_master.mappers['_task'] = mapper