def _tile_cupid()

in odps/mars_extension/legacy/dataframe/datasource.py [0:0]


    def _tile_cupid(cls, op):
        from odps import ODPS
        from odps.accounts import BearerTokenAccount
        from cupid import CupidSession, context
        from cupid.errors import CupidError
        from mars.context import get_context

        cupid_ctx = context()

        bearer_token = cupid_ctx.get_bearer_token()
        account = BearerTokenAccount(bearer_token)
        project = os.environ.get("ODPS_PROJECT_NAME", None)
        odps_params = op.odps_params.copy()
        if project:
            odps_params["project"] = project
        endpoint = os.environ.get("ODPS_RUNTIME_ENDPOINT") or odps_params["endpoint"]
        o = ODPS(
            None,
            None,
            account=account,
            project=odps_params["project"],
            endpoint=endpoint,
        )
        cupid_session = CupidSession(o)

        mars_context = get_context()

        df = op.outputs[0]
        split_size = df.extra_params.chunk_bytes or READ_CHUNK_LIMIT

        out_dtypes = df.dtypes
        out_shape = df.shape
        out_columns_value = df.columns_value
        if op.columns is not None:
            out_dtypes = out_dtypes[op.columns]
            out_shape = (df.shape[0], len(op.columns))
            out_columns_value = parse_index(out_dtypes.index, store_data=True)

        table_obj = o.get_table(op.table_name)
        if not table_obj.table_schema.partitions:
            data_srcs = [table_obj]
        elif op.partition is not None and check_partition_exist(
            table_obj, op.partition
        ):
            data_srcs = [table_obj.get_partition(op.partition)]
        else:
            data_srcs = list(table_obj.partitions)
            if op.partition is not None:
                data_srcs = filter_partitions(o, data_srcs, op.partition)

        out_chunks = []
        chunk_idx = 0

        for data_src in data_srcs:
            try:
                data_store_size = data_src.size
            except ODPSError:
                # fail to get data size, just ignore
                pass
            else:
                if data_store_size < split_size and mars_context is not None:
                    # get worker counts
                    worker_count = max(len(mars_context.get_worker_addresses()), 1)
                    # data is too small, split as many as number of cores
                    split_size = data_store_size // worker_count
                    # at least 1M
                    split_size = max(split_size, 1 * 1024**2)
                    logger.debug(
                        "Input data size is too small, split_size is %s", split_size
                    )

            logger.debug(
                "Start creating download session of table %s from cupid, "
                "columns: %s",
                op.table_name,
                op.columns,
            )
            while True:
                try:
                    download_session = cupid_session.create_download_session(
                        data_src,
                        split_size=split_size,
                        columns=op.columns,
                        with_split_meta=op.with_split_meta_on_tile,
                    )
                    break
                except CupidError:
                    logger.debug(
                        "The number of splits exceeds 100000, split_size is %s",
                        split_size,
                    )
                    if split_size >= MAX_CHUNK_SIZE:
                        raise
                    else:
                        split_size *= 2

            logger.debug(
                "%s table splits have been created.", str(len(download_session.splits))
            )

            meta_chunk_rows = [
                split.meta_row_count for split in download_session.splits
            ]
            if np.isnan(out_shape[0]):
                est_chunk_rows = meta_chunk_rows
            else:
                sp_file_sizes = np.array(
                    [
                        sp.split_file_end - sp.split_file_start
                        for sp in download_session.splits
                    ]
                )
                total_size = sp_file_sizes.sum()
                ratio_chunk_rows = (sp_file_sizes * out_shape[0] // total_size).tolist()
                est_chunk_rows = [
                    mr if mr is not None else rr
                    for mr, rr in zip(meta_chunk_rows, ratio_chunk_rows)
                ]

            partition_spec = (
                str(data_src.partition_spec)
                if getattr(data_src, "partition_spec", None)
                else None
            )

            logger.warning("Estimated chunk rows: %r", est_chunk_rows)

            if len(download_session.splits) == 0:
                logger.debug("Table %s has no data", op.table_name)
                chunk_op = DataFrameReadTableSplit()
                index_value = parse_index(pd.RangeIndex(0))
                columns_value = parse_index(out_dtypes.index, store_data=True)
                out_chunk = chunk_op.new_chunk(
                    None,
                    shape=(np.nan, out_shape[1]),
                    dtypes=op.dtypes,
                    index_value=index_value,
                    columns_value=columns_value,
                    index=(chunk_idx, 0),
                )
                out_chunks.append(out_chunk)
                chunk_idx += 1
            else:
                for idx, split in enumerate(download_session.splits):
                    chunk_op = DataFrameReadTableSplit(
                        cupid_handle=to_str(split.handle),
                        split_index=split.split_index,
                        split_file_start=split.split_file_start,
                        split_file_end=split.split_file_end,
                        schema_file_start=split.schema_file_start,
                        schema_file_end=split.schema_file_end,
                        add_offset=op.add_offset,
                        dtypes=out_dtypes,
                        sparse=op.sparse,
                        split_size=split_size,
                        string_as_binary=op.string_as_binary,
                        use_arrow_dtype=op.use_arrow_dtype,
                        estimate_rows=est_chunk_rows[idx],
                        partition_spec=partition_spec,
                        append_partitions=op.append_partitions,
                        meta_raw_size=split.meta_raw_size,
                        nrows=meta_chunk_rows[idx] or op.nrows,
                        memory_scale=op.memory_scale,
                        index_columns=op.index_columns,
                    )
                    # the chunk shape is unknown
                    index_value = parse_index(pd.RangeIndex(0))
                    columns_value = parse_index(out_dtypes.index, store_data=True)
                    out_chunk = chunk_op.new_chunk(
                        None,
                        shape=(np.nan, out_shape[1]),
                        dtypes=out_dtypes,
                        index_value=index_value,
                        columns_value=columns_value,
                        index=(chunk_idx, 0),
                    )
                    chunk_idx += 1
                    out_chunks.append(out_chunk)

        if op.add_offset:
            out_chunks = standardize_range_index(out_chunks)

        new_op = op.copy()
        nsplits = ((np.nan,) * len(out_chunks), (out_shape[1],))
        return new_op.new_dataframes(
            None,
            shape=out_shape,
            dtypes=op.dtypes,
            index_value=df.index_value,
            columns_value=out_columns_value,
            chunks=out_chunks,
            nsplits=nsplits,
        )