services/migration_service/api/utils.py (67 lines of code) (raw):
from subprocess import Popen, PIPE
from ..data.postgres_async_db import PostgresUtils
from . import version_dict, latest, \
make_goose_migration_template, make_goose_template
from services.migration_service.migration_config import db_conf
import sys
class ApiUtils(object):
@staticmethod
def list_migrations():
migrations_list = list((version_dict.keys()))
migrations_list.sort(key=int)
return migrations_list[1:]
@staticmethod
def get_unapplied_migrations(current_version):
try:
migrations_list = ApiUtils.list_migrations()
index_version = migrations_list.index(current_version)
return migrations_list[index_version + 1:]
except:
return migrations_list
@staticmethod
async def get_goose_version():
# if tables exist but goose doesn't find version table then
goose_version_cmd = make_goose_template(db_conf.connection_string_url(), 'version')
p = Popen(goose_version_cmd, stdout=PIPE, stderr=PIPE, shell=True,
close_fds=True)
p.wait()
version = None
std_err = p.stderr.read()
lines_err = std_err.decode("utf-8").split("\n")
for line in lines_err:
if "goose: version" in line:
s = line.split("goose: version ")
version = s[1]
print(line)
break
if version:
return version
else:
raise Exception(
"unable to get db version via goose: " + std_err.decode("utf-8"))
@staticmethod
async def get_latest_compatible_version():
is_present = await PostgresUtils.is_present("flows_v3")
if is_present:
version = await ApiUtils.get_goose_version()
return version_dict[version]
else:
print("Running initial migration..", file=sys.stderr)
goose_version_cmd = make_goose_migration_template(db_conf.connection_string_url(), 'up')
p = Popen(goose_version_cmd, shell=True,
close_fds=True)
if p.wait() != 0:
raise Exception("Failed to run initial migration")
return latest
@staticmethod
async def is_migration_in_progress():
goose_version_cmd = make_goose_template(
db_conf.connection_string_url(), "status"
)
p = Popen(goose_version_cmd, stdout=PIPE, stderr=PIPE, shell=True,
close_fds=True)
p.wait()
std_err = p.stderr.read()
lines_err = std_err.decode("utf-8")
if "Pending" in lines_err:
return True
return False