in bigquery_etl/routine/parse_routine.py [0:0]
def from_file(cls, path):
"""Create a RawRoutine instance from text."""
filepath = Path(path)
text = render(
filepath.name,
template_folder=filepath.parent,
format=False,
)
sql = sqlparse.format(text, strip_comments=True)
statements = [s for s in sqlparse.split(sql) if s.strip()]
name = filepath.parent.name
dataset = filepath.parent.parent.name
project = filepath.parent.parent.parent
persistent_name_re = rf"`?{dataset}`?.`?{name}`?"
persistent_name = f"{dataset}.{name}"
temp_name = f"{dataset}_{name}"
internal_name = None
is_stored_procedure = False
definitions = []
tests = []
procedure_start = -1
for i, s in enumerate(statements):
normalized_statement = " ".join(s.lower().split())
if (
normalized_statement.startswith("create or replace function")
or normalized_statement.startswith(
"create or replace aggregate function"
)
or normalized_statement.startswith("create or replace table function")
):
definitions.append(s)
if re.search(persistent_name_re, normalized_statement):
internal_name = persistent_name
elif normalized_statement.startswith("create temp function"):
definitions.append(s)
if temp_name in normalized_statement:
internal_name = temp_name
elif normalized_statement.startswith("create or replace procedure"):
is_stored_procedure = True
definitions.append(s)
tests.append(s)
if re.search(persistent_name_re, normalized_statement):
internal_name = persistent_name
else:
if normalized_statement.startswith("begin"):
procedure_start = i
if procedure_start == -1:
tests.append(s)
if procedure_start > -1 and normalized_statement.endswith("end;"):
tests.append(" ".join(statements[procedure_start : i + 1]))
procedure_start = -1
# get routines that could be referenced by the UDF
routines = get_routines(project)
dependencies = []
for udf in routines:
udf_re = re.compile(
r"\b"
+ r"\.".join(f"`?{name}`?" for name in udf["name"].split("."))
+ r"\("
)
if udf_re.search("\n".join(definitions)):
dependencies.append(udf["name"])
dependencies.extend(re.findall(TEMP_UDF_RE, "\n".join(definitions)))
dependencies = list(set(dependencies))
if internal_name in dependencies:
dependencies.remove(internal_name)
return cls(
name=internal_name,
filepath=path,
definitions=definitions,
tests=tests,
dependencies=sorted(dependencies),
is_stored_procedure=is_stored_procedure,
)