in odps/ipython/magics.py [0:0]
def execute(self, line, cell=""):
self._set_odps()
content = line + "\n" + cell
content = content.strip()
sql = None
hints = dict()
splits = content.split(";")
for s in splits:
stripped = s.strip()
if stripped.lower().startswith("set "):
hint = stripped.split(" ", 1)[1]
k, v = hint.split("=", 1)
k, v = k.strip(), v.strip()
hints[k] = v
elif len(stripped) == 0:
continue
else:
if sql is None:
sql = s
else:
sql = "%s;%s" % (sql, s)
# replace user defined parameters
sql = replace_sql_parameters(sql, self.shell.user_ns)
if sql:
progress_ui = init_progress_ui()
group_id = create_instance_group("SQL Query")
progress_ui.add_keys(group_id)
instance = self._odps.run_sql(sql, hints=hints)
if logger.getEffectiveLevel() <= logging.INFO:
logger.info(
"Instance ID: %s\n Log view: %s",
instance.id,
instance.get_logview_address(),
)
reload_instance_status(self._odps, group_id, instance.id)
progress_ui.status("Executing")
percent = 0
while not instance.is_terminated(retry=True):
last_percent = percent
reload_instance_status(self._odps, group_id, instance.id)
inst_progress = fetch_instance_group(group_id).instances.get(
instance.id
)
if inst_progress is not None and len(inst_progress.tasks) > 0:
percent = sum(
self._get_task_percent(task)
for task in six.itervalues(inst_progress.tasks)
) / len(inst_progress.tasks)
else:
percent = 0
percent = min(1, max(percent, last_percent))
progress_ui.update(percent)
progress_ui.update_group()
time.sleep(1)
instance.wait_for_success()
progress_ui.update(1)
try:
with instance.open_reader() as reader:
try:
import pandas as pd
try:
from pandas.io.parsers import (
ParserError as CParserError,
)
except ImportError:
pass
try:
from pandas.parser import CParserError # noqa
except ImportError:
CParserError = ValueError # noqa
if not hasattr(reader, "raw"):
res = ResultFrame(
[rec.values for rec in reader],
schema=odps_schema_to_df_schema(reader._schema),
)
else:
try:
res = pd.read_csv(StringIO(reader.raw))
if len(res.values) > 0:
schema = DataFrame(res).schema
else:
cols = res.columns.tolist()
schema = odps_schema_to_df_schema(
TableSchema.from_lists(
cols, ["string" for _ in cols]
)
)
res = ResultFrame(res.values, schema=schema)
except (ValueError, CParserError):
res = reader.raw
except (ImportError, ValueError):
if not hasattr(reader, "raw"):
res = ResultFrame(
[rec.values for rec in reader],
schema=odps_schema_to_df_schema(reader._schema),
)
else:
try:
columns = [
odps_types.Column(
name=col.name,
typo=odps_type_to_df_type(col.type),
)
for col in reader._columns
]
res = ResultFrame(list(reader), columns=columns)
except TypeError:
res = reader.raw
html_notify("SQL execution succeeded")
return res
finally:
progress_ui.close()