services/ui_backend_service/data/db/tables/flow.py (27 lines of code) (raw):
from typing import List
from .base import AsyncPostgresTable
from ..models import FlowRow
from services.data.db_utils import DBResponse, DBPagination
# use schema constants from the .data module to keep things consistent
from services.data.postgres_async_db import AsyncFlowTablePostgres as MetadataFlowTable
class AsyncFlowTablePostgres(AsyncPostgresTable):
table_name = MetadataFlowTable.table_name
keys = MetadataFlowTable.keys
primary_keys = MetadataFlowTable.primary_keys
trigger_keys = MetadataFlowTable.trigger_keys
select_columns = keys
_row_type = FlowRow
async def get_flow_ids(self, conditions: List[str] = [],
values: List[str] = [], limit: int = 0, offset: int = 0) -> (DBResponse, DBPagination):
"""
Get a paginated set of flow ids.
Parameters
----------
conditions : List[str]
list of conditions to pass the sql execute, with %s placeholders for values
values : List[str]
list of values to be passed for the sql execute.
limit : int (optional) (default 0)
limit for the number of results
offset : int (optional) (default 0)
offset for the results.
Returns
-------
(DBResponse, DBPagination)
"""
sql_template = """
SELECT DISTINCT flow_id
FROM {table_name}
{conditions}
{limit}
{offset}
"""
select_sql = sql_template.format(
table_name=self.table_name,
keys=",".join(self.select_columns),
conditions=("WHERE {}".format(" AND ".join(conditions)) if conditions else ""),
limit="LIMIT {}".format(limit) if limit else "",
offset="OFFSET {}".format(offset) if offset else ""
)
res, pag = await self.execute_sql(select_sql=select_sql, values=values, fetch_single=False,
expanded=False,
limit=limit, offset=offset, serialize=False)
# process the unserialized DBResponse
_body = [row[0] for row in res.body]
return DBResponse(res.response_code, _body), pag