def _execute_arrow_tunnel()

in odps/mars_extension/oscar/dataframe/datasource.py [0:0]


    def _execute_arrow_tunnel(cls, ctx, op):
        from odps import ODPS
        from odps.tunnel import TableTunnel

        out = op.outputs[0]

        if op.table_name is None:
            # is empty table
            ctx[out.key] = cls._build_empty_df(out)
            return

        project = os.environ.get("ODPS_PROJECT_NAME", None)
        odps_params = op.odps_params.copy()
        if project:
            odps_params["project"] = project
        endpoint = os.environ.get("ODPS_RUNTIME_ENDPOINT") or odps_params["endpoint"]
        o = ODPS(
            odps_params["access_id"],
            odps_params["secret_access_key"],
            project=odps_params["project"],
            endpoint=endpoint,
        )

        t = o.get_table(op.table_name)
        schema_name = t.get_schema().name if t.get_schema() else None
        tunnel = TableTunnel(o, project=t.project, quota_name=op.tunnel_quota_name)
        retry_times = op.retry_times or options.retry_times
        init_sleep_secs = 1
        logger.info(
            "Start creating download session for table %s(%s) start index %s end index %s retry_times %s.",
            op.table_name,
            op.partition_spec,
            op.start_index,
            op.end_index,
            retry_times,
        )
        retries = 0
        while True:
            try:
                if op.partition_spec is not None:
                    download_session = tunnel.create_download_session(
                        t.name,
                        partition_spec=op.partition_spec,
                        schema=schema_name,
                        async_mode=True,
                    )
                else:
                    download_session = tunnel.create_download_session(
                        t.name, async_mode=True
                    )
                break
            except:
                if retries >= retry_times:
                    raise
                retries += 1
                sleep_secs = retries * init_sleep_secs
                logger.exception(
                    "Create download session failed, sleep %s seconds and retry it",
                    sleep_secs,
                    exc_info=1,
                )
                time.sleep(sleep_secs)

        logger.info(
            "Start reading table %s(%s) split from %s to %s",
            op.table_name,
            op.partition_spec,
            op.start_index,
            op.end_index,
        )
        if op.nrows is None:
            count = op.end_index - op.start_index
        else:
            count = op.nrows

        retries = 0
        while True:
            try:
                columns = op.columns
                if columns is not None:
                    columns = (op.index_columns or []) + op.columns
                with download_session.open_arrow_reader(
                    op.start_index, count, columns=columns
                ) as reader:
                    table = reader.read()
                break
            except:
                if retries >= retry_times:
                    raise
                retries += 1
                sleep_secs = retries * init_sleep_secs
                logger.exception(
                    "Read table failed, sleep %s seconds and retry it",
                    sleep_secs,
                    exc_info=1,
                )
                time.sleep(sleep_secs)

        table = cls._append_partition_values(table, op)
        if op.string_as_binary:
            table = cls._cast_string_to_binary(table)
        data = arrow_table_to_pandas_dataframe(
            table, use_arrow_dtype=op.use_arrow_dtype
        )
        if op.index_columns:
            data = data.set_index(op.index_columns)
        data = cls._align_output_data(op, data)
        logger.info(
            "Finish reading table %s(%s) split from %s to %s",
            op.table_name,
            op.partition_spec,
            op.start_index,
            op.end_index,
        )
        ctx[op.outputs[0].key] = data