def execute()

in odps/ipython/magics.py [0:0]


        def execute(self, line, cell=""):
            self._set_odps()

            content = line + "\n" + cell
            content = content.strip()

            sql = None
            hints = dict()

            splits = content.split(";")
            for s in splits:
                stripped = s.strip()
                if stripped.lower().startswith("set "):
                    hint = stripped.split(" ", 1)[1]
                    k, v = hint.split("=", 1)
                    k, v = k.strip(), v.strip()
                    hints[k] = v
                elif len(stripped) == 0:
                    continue
                else:
                    if sql is None:
                        sql = s
                    else:
                        sql = "%s;%s" % (sql, s)

            # replace user defined parameters
            sql = replace_sql_parameters(sql, self.shell.user_ns)

            if sql:
                progress_ui = init_progress_ui()
                group_id = create_instance_group("SQL Query")
                progress_ui.add_keys(group_id)

                instance = self._odps.run_sql(sql, hints=hints)
                if logger.getEffectiveLevel() <= logging.INFO:
                    logger.info(
                        "Instance ID: %s\n  Log view: %s",
                        instance.id,
                        instance.get_logview_address(),
                    )
                reload_instance_status(self._odps, group_id, instance.id)
                progress_ui.status("Executing")

                percent = 0
                while not instance.is_terminated(retry=True):
                    last_percent = percent

                    reload_instance_status(self._odps, group_id, instance.id)
                    inst_progress = fetch_instance_group(group_id).instances.get(
                        instance.id
                    )

                    if inst_progress is not None and len(inst_progress.tasks) > 0:
                        percent = sum(
                            self._get_task_percent(task)
                            for task in six.itervalues(inst_progress.tasks)
                        ) / len(inst_progress.tasks)
                    else:
                        percent = 0

                    percent = min(1, max(percent, last_percent))

                    progress_ui.update(percent)
                    progress_ui.update_group()

                    time.sleep(1)

                instance.wait_for_success()
                progress_ui.update(1)

                try:
                    with instance.open_reader() as reader:
                        try:
                            import pandas as pd

                            try:
                                from pandas.io.parsers import (
                                    ParserError as CParserError,
                                )
                            except ImportError:
                                pass
                            try:
                                from pandas.parser import CParserError  # noqa
                            except ImportError:
                                CParserError = ValueError  # noqa

                            if not hasattr(reader, "raw"):
                                res = ResultFrame(
                                    [rec.values for rec in reader],
                                    schema=odps_schema_to_df_schema(reader._schema),
                                )
                            else:
                                try:
                                    res = pd.read_csv(StringIO(reader.raw))
                                    if len(res.values) > 0:
                                        schema = DataFrame(res).schema
                                    else:
                                        cols = res.columns.tolist()
                                        schema = odps_schema_to_df_schema(
                                            TableSchema.from_lists(
                                                cols, ["string" for _ in cols]
                                            )
                                        )
                                    res = ResultFrame(res.values, schema=schema)
                                except (ValueError, CParserError):
                                    res = reader.raw
                        except (ImportError, ValueError):
                            if not hasattr(reader, "raw"):
                                res = ResultFrame(
                                    [rec.values for rec in reader],
                                    schema=odps_schema_to_df_schema(reader._schema),
                                )
                            else:
                                try:
                                    columns = [
                                        odps_types.Column(
                                            name=col.name,
                                            typo=odps_type_to_df_type(col.type),
                                        )
                                        for col in reader._columns
                                    ]
                                    res = ResultFrame(list(reader), columns=columns)
                                except TypeError:
                                    res = reader.raw

                    html_notify("SQL execution succeeded")
                    return res
                finally:
                    progress_ui.close()