def _tile_tunnel()

in odps/mars_extension/legacy/dataframe/datasource.py [0:0]


    def _tile_tunnel(cls, op):
        from odps import ODPS

        project = os.environ.get("ODPS_PROJECT_NAME", None)
        odps_params = op.odps_params.copy()
        if project:
            odps_params["project"] = project
        endpoint = os.environ.get("ODPS_RUNTIME_ENDPOINT") or odps_params["endpoint"]
        o = ODPS(
            odps_params["access_id"],
            odps_params["secret_access_key"],
            project=odps_params["project"],
            endpoint=endpoint,
        )

        table_obj = o.get_table(op.table_name)
        if not table_obj.table_schema.partitions:
            data_srcs = [table_obj]
        elif op.partition is not None and check_partition_exist(
            table_obj, op.partition
        ):
            data_srcs = [table_obj.get_partition(op.partition)]
        else:
            data_srcs = list(table_obj.partitions)
            if op.partition is not None:
                data_srcs = filter_partitions(o, data_srcs, op.partition)

        out_chunks = []
        row_nsplits = []
        index_start = 0
        df = op.outputs[0]

        out_dtypes = df.dtypes
        out_shape = df.shape
        out_columns_value = df.columns_value
        if op.columns is not None:
            out_dtypes = out_dtypes[op.columns]
            out_shape = (df.shape[0], len(op.columns))
            out_columns_value = parse_index(out_dtypes.index, store_data=True)

        for data_src in data_srcs:
            data_store_size = data_src.size
            shape = out_shape
            chunk_size = df.extra_params.chunk_size

            partition_spec = (
                str(data_src.partition_spec)
                if getattr(data_src, "partition_spec", None)
                else None
            )

            if chunk_size is None:
                chunk_bytes = df.extra_params.chunk_bytes or READ_CHUNK_LIMIT
                chunk_count = data_store_size // chunk_bytes + (
                    data_store_size % chunk_bytes != 0
                )
                chunk_size = ceildiv(shape[0], chunk_count)
                split_size = chunk_bytes
            else:
                chunk_count = ceildiv(shape[0], chunk_size)
                split_size = data_store_size // chunk_count

            for i in range(chunk_count):
                start_index = chunk_size * i
                end_index = min(chunk_size * (i + 1), shape[0])
                row_size = end_index - start_index
                chunk_op = DataFrameReadTableSplit(
                    table_name=op.table_name,
                    partition_spec=partition_spec,
                    start_index=start_index,
                    end_index=end_index,
                    nrows=op.nrows,
                    odps_params=op.odps_params,
                    columns=op.columns,
                    add_offset=op.add_offset,
                    dtypes=out_dtypes,
                    sparse=op.sparse,
                    split_size=split_size,
                    use_arrow_dtype=op.use_arrow_dtype,
                    estimate_rows=row_size,
                    append_partitions=op.append_partitions,
                    memory_scale=op.memory_scale,
                    index_columns=op.index_columns,
                )
                index_value = parse_index(pd.RangeIndex(start_index, end_index))
                columns_value = parse_index(out_dtypes.index, store_data=True)
                out_chunk = chunk_op.new_chunk(
                    None,
                    shape=(row_size, out_shape[1]),
                    dtypes=out_dtypes,
                    index_value=index_value,
                    columns_value=columns_value,
                    index=(index_start + i, 0),
                )
                row_nsplits.append(row_size)
                out_chunks.append(out_chunk)

            index_start += chunk_count

        if op.add_offset:
            out_chunks = standardize_range_index(out_chunks)

        new_op = op.copy()
        nsplits = (tuple(row_nsplits), (out_shape[1],))
        return new_op.new_dataframes(
            None,
            shape=out_shape,
            dtypes=op.dtypes,
            index_value=df.index_value,
            columns_value=out_columns_value,
            chunks=out_chunks,
            nsplits=nsplits,
        )