in core/maxframe/dataframe/datasource/read_odps_query.py [0:0]
def _parse_full_explain(explain_string: str) -> OdpsSchema:
sectors = _split_explain_string(explain_string)
jobs_sector = tasks_sector = None
for sector in sectors:
if _EXPLAIN_JOB_REGEX.search(sector):
jobs_sector = _resolve_jobs_sector(sector)
elif _EXPLAIN_TASKS_HEADER_REGEX.search(sector):
tasks_sector = _resolve_tasks_sector(sector)
assert jobs_sector is not None
jobs_sector.jobs[tasks_sector.job_name] = tasks_sector
elif _EXPLAIN_TASK_REGEX.search(sector):
assert tasks_sector is not None
task_sector = _resolve_task_sector(tasks_sector.job_name, sector)
tasks_sector.tasks[task_sector.task_name] = task_sector
job_dag = jobs_sector.build_dag()
indep_job_names = list(job_dag.iter_indep(reverse=True))
schema_signatures = dict()
for job_name in indep_job_names:
tasks_sector = jobs_sector.jobs[job_name]
task_dag = tasks_sector.build_dag()
indep_task_names = list(task_dag.iter_indep(reverse=True))
for task_name in indep_task_names:
for task_sector in _select_task_prefix(tasks_sector, task_name):
if not task_sector.schema: # pragma: no cover
raise ValueError("Cannot detect output schema")
if task_sector.output_target != "Screen":
raise ValueError("The SQL statement should be an instant query")
sig_tuples = sorted(
[
(c.column_alias or c.column_name, c.column_type)
for c in task_sector.schema
]
)
schema_signatures[hash(tuple(sig_tuples))] = task_sector.schema
if len(schema_signatures) != 1:
raise ValueError("Only one final task is allowed in SQL statement")
schema = list(schema_signatures.values())[0]
cols = [
Column(c.column_alias or c.column_name, validate_data_type(c.column_type))
for c in schema
]
return OdpsSchema(cols)