def _tile_tunnel()

in odps/mars_extension/oscar/dataframe/datasource.py [0:0]


    def _tile_tunnel(cls, op):
        from odps import ODPS

        project = os.environ.get("ODPS_PROJECT_NAME", None)
        odps_params = op.odps_params.copy()
        if project:
            odps_params["project"] = project
        endpoint = os.environ.get("ODPS_RUNTIME_ENDPOINT") or odps_params["endpoint"]
        o = ODPS(
            odps_params["access_id"],
            odps_params["secret_access_key"],
            project=odps_params["project"],
            endpoint=endpoint,
        )

        table_obj = o.get_table(op.table_name)
        if not table_obj.table_schema.partitions:
            data_srcs = [table_obj]
        elif op.partition is not None and check_partition_exist(
            table_obj, op.partition
        ):
            data_srcs = [table_obj.get_partition(op.partition)]
        else:
            data_srcs = list(table_obj.partitions)
            if op.partition is not None:
                data_srcs = filter_partitions(o, data_srcs, op.partition)

        out_chunks = []
        row_nsplits = []
        index_start = 0
        df = op.outputs[0]

        out_dtypes = df.dtypes
        out_shape = df.shape
        out_columns_value = df.columns_value
        if op.columns is not None:
            out_dtypes = out_dtypes[op.columns]
            out_shape = (df.shape[0], len(op.columns))
            out_columns_value = parse_index(out_dtypes.index, store_data=True)

        if len(data_srcs) == 0:
            # no partitions are selected
            chunk_op = DataFrameReadTableSplit()
            index_value = parse_index(pd.RangeIndex(0))
            columns_value = parse_index(out_dtypes.index, store_data=True)
            out_chunk = chunk_op.new_chunk(
                None,
                shape=(0, out_shape[1]),
                dtypes=op.dtypes,
                index_value=index_value,
                columns_value=columns_value,
                index=(index_start, 0),
            )
            out_chunks.append(out_chunk)
        else:
            retry_times = op.retry_times or options.retry_times
            for data_src in data_srcs:
                data_store_size = data_src.size

                retries = 0
                while True:
                    try:
                        with data_src.open_reader() as reader:
                            record_count = reader.count
                        break
                    except:
                        if retries >= retry_times:
                            raise
                        retries += 1
                        time.sleep(1)
                if data_store_size == 0:
                    # empty table
                    chunk_op = DataFrameReadTableSplit()
                    index_value = parse_index(pd.RangeIndex(0))
                    columns_value = parse_index(out_dtypes.index, store_data=True)
                    out_chunk = chunk_op.new_chunk(
                        None,
                        shape=(0, out_shape[1]),
                        dtypes=op.dtypes,
                        index_value=index_value,
                        columns_value=columns_value,
                        index=(index_start, 0),
                    )
                    out_chunks.append(out_chunk)
                    index_start += 1
                    continue
                chunk_size = df.extra_params.chunk_size

                partition_spec = (
                    str(data_src.partition_spec)
                    if getattr(data_src, "partition_spec", None)
                    else None
                )

                if chunk_size is None:
                    chunk_bytes = df.extra_params.chunk_bytes or CHUNK_BYTES_LIMIT
                    chunk_count = data_store_size // chunk_bytes + (
                        data_store_size % chunk_bytes != 0
                    )
                    chunk_size = ceildiv(record_count, chunk_count)
                    split_size = chunk_bytes
                else:
                    chunk_count = ceildiv(record_count, chunk_size)
                    split_size = data_store_size // chunk_count

                for i in range(chunk_count):
                    start_index = chunk_size * i
                    end_index = min(chunk_size * (i + 1), record_count)
                    row_size = end_index - start_index
                    chunk_op = DataFrameReadTableSplit(
                        table_name=op.table_name,
                        partition_spec=partition_spec,
                        start_index=start_index,
                        end_index=end_index,
                        nrows=op.nrows,
                        odps_params=op.odps_params,
                        columns=op.columns,
                        index_type=op.index_type,
                        dtypes=out_dtypes,
                        sparse=op.sparse,
                        split_size=split_size,
                        use_arrow_dtype=op.use_arrow_dtype,
                        estimate_rows=row_size,
                        append_partitions=op.append_partitions,
                        memory_scale=op.memory_scale,
                        retry_times=op.retry_times,
                        tunnel_quota_name=op.tunnel_quota_name,
                        index_columns=op.index_columns,
                        extra_params=op.extra_params,
                    )
                    index_value = parse_index(pd.RangeIndex(start_index, end_index))
                    columns_value = parse_index(out_dtypes.index, store_data=True)
                    out_chunk = chunk_op.new_chunk(
                        None,
                        shape=(row_size, out_shape[1]),
                        dtypes=out_dtypes,
                        index_value=index_value,
                        columns_value=columns_value,
                        index=(index_start + i, 0),
                    )
                    row_nsplits.append(row_size)
                    out_chunks.append(out_chunk)

                index_start += chunk_count

        if op.index_type == "incremental" and _NEED_STANDARDIZE:
            out_chunks = standardize_range_index(out_chunks)

        new_op = op.copy()
        nsplits = (tuple(row_nsplits), (out_shape[1],))
        return new_op.new_dataframes(
            None,
            shape=out_shape,
            dtypes=op.dtypes,
            index_value=df.index_value,
            columns_value=out_columns_value,
            chunks=out_chunks,
            nsplits=nsplits,
        )