def _tile_cupid()

in odps/mars_extension/oscar/dataframe/datasource.py [0:0]


    def _tile_cupid(cls, op):
        import numpy as np
        import pandas as pd
        from mars.core.context import get_context

        df = op.outputs[0]
        split_size = df.extra_params.chunk_bytes or CHUNK_BYTES_LIMIT

        out_dtypes = df.dtypes
        out_shape = df.shape
        out_columns_value = df.columns_value
        if op.columns is not None:
            out_dtypes = out_dtypes[op.columns]
            out_shape = (df.shape[0], len(op.columns))
            out_columns_value = parse_index(out_dtypes.index, store_data=True)

        mars_context = get_context()
        if mars_context is not None:
            worker_count = len(mars_context.get_worker_addresses())
        else:
            worker_count = None

        cupid_client = CupidServiceClient()
        try:
            parts = cupid_client.enum_table_partitions(
                op.odps_params, op.table_name, op.partition
            )
            if parts is None:
                parts = [None]

            out_chunks = []
            chunk_idx = 0

            for partition_spec in parts:
                columns = op.columns
                if columns is not None:
                    columns = (op.index_columns or []) + columns
                splits, split_size = cupid_client.create_table_download_session(
                    op.odps_params,
                    op.table_name,
                    partition_spec,
                    columns,
                    worker_count,
                    split_size,
                    MAX_CHUNK_NUM,
                    op.with_split_meta_on_tile,
                )

                logger.info("%s table splits have been created.", str(len(splits)))
                meta_chunk_rows = [split.meta_row_count for split in splits]
                if np.isnan(out_shape[0]):
                    est_chunk_rows = meta_chunk_rows
                else:
                    sp_file_sizes = np.array(
                        [sp.split_file_end - sp.split_file_start for sp in splits]
                    )
                    total_size = sp_file_sizes.sum()
                    ratio_chunk_rows = (
                        sp_file_sizes * out_shape[0] // total_size
                    ).tolist()
                    est_chunk_rows = [
                        mr if mr is not None else rr
                        for mr, rr in zip(meta_chunk_rows, ratio_chunk_rows)
                    ]

                logger.warning("Estimated chunk rows: %r", est_chunk_rows)

                if len(splits) == 0:
                    logger.info("Table %s has no data", op.table_name)
                    chunk_op = DataFrameReadTableSplit()
                    index_value = parse_index(pd.RangeIndex(0))
                    columns_value = parse_index(out_dtypes.index, store_data=True)
                    out_chunk = chunk_op.new_chunk(
                        None,
                        shape=(np.nan, out_shape[1]),
                        dtypes=op.dtypes,
                        index_value=index_value,
                        columns_value=columns_value,
                        index=(chunk_idx, 0),
                    )
                    out_chunks.append(out_chunk)
                    chunk_idx += 1
                else:
                    for idx, split in enumerate(splits):
                        chunk_op = DataFrameReadTableSplit(
                            cupid_handle=to_str(split.handle),
                            split_index=split.split_index,
                            split_file_start=split.split_file_start,
                            split_file_end=split.split_file_end,
                            schema_file_start=split.schema_file_start,
                            schema_file_end=split.schema_file_end,
                            index_type=op.index_type,
                            dtypes=out_dtypes,
                            sparse=op.sparse,
                            split_size=split_size,
                            string_as_binary=op.string_as_binary,
                            use_arrow_dtype=op.use_arrow_dtype,
                            estimate_rows=est_chunk_rows[idx],
                            partition_spec=partition_spec,
                            append_partitions=op.append_partitions,
                            meta_raw_size=split.meta_raw_size,
                            nrows=meta_chunk_rows[idx] or op.nrows,
                            memory_scale=op.memory_scale,
                            index_columns=op.index_columns,
                            extra_params=op.extra_params,
                        )
                        # the chunk shape is unknown
                        index_value = parse_index(pd.RangeIndex(0))
                        columns_value = parse_index(out_dtypes.index, store_data=True)
                        out_chunk = chunk_op.new_chunk(
                            None,
                            shape=(np.nan, out_shape[1]),
                            dtypes=out_dtypes,
                            index_value=index_value,
                            columns_value=columns_value,
                            index=(chunk_idx, 0),
                        )
                        chunk_idx += 1
                        out_chunks.append(out_chunk)
        finally:
            cupid_client.close()

        if op.index_type == "incremental" and _NEED_STANDARDIZE:
            out_chunks = standardize_range_index(out_chunks)
        new_op = op.copy()
        nsplits = ((np.nan,) * len(out_chunks), (out_shape[1],))
        return new_op.new_dataframes(
            None,
            shape=out_shape,
            dtypes=op.dtypes,
            index_value=df.index_value,
            columns_value=out_columns_value,
            chunks=out_chunks,
            nsplits=nsplits,
        )