def _tile_cupid()

in odps/mars_extension/oscar/dataframe/datastore.py [0:0]


    def _tile_cupid(cls, op):
        from mars.dataframe.utils import build_concatenated_rows_frame

        cupid_client = CupidServiceClient()
        upload_handle = cupid_client.create_table_upload_session(
            op.odps_params, op.table_name
        )

        input_df = build_concatenated_rows_frame(op.inputs[0])
        out_df = op.outputs[0]

        out_chunks = []
        out_chunk_shape = (0,) * len(input_df.shape)
        blocks = {}
        for chunk in input_df.chunks:
            block_id = str(int(time.time())) + "_" + str(uuid.uuid4()).replace("-", "")
            chunk_op = DataFrameWriteTableSplit(
                dtypes=op.dtypes,
                table_name=op.table_name,
                odps_params=op.odps_params,
                unknown_as_string=op.unknown_as_string,
                partition_spec=op.partition_spec,
                cupid_handle=to_str(upload_handle),
                block_id=block_id,
                write_batch_size=op.write_batch_size,
            )
            out_chunk = chunk_op.new_chunk(
                [chunk],
                shape=out_chunk_shape,
                index=chunk.index,
                index_value=out_df.index_value,
                dtypes=chunk.dtypes,
            )
            out_chunks.append(out_chunk)
            blocks[block_id] = op.partition_spec

        # build commit tree
        combine_size = 8
        chunks = out_chunks
        while len(chunks) >= combine_size:
            new_chunks = []
            for i in range(0, len(chunks), combine_size):
                chks = chunks[i : i + combine_size]
                if len(chks) == 1:
                    chk = chks[0]
                else:
                    chk_op = DataFrameWriteTableCommit(
                        dtypes=op.dtypes, is_terminal=False
                    )
                    chk = chk_op.new_chunk(
                        chks,
                        shape=out_chunk_shape,
                        index_value=out_df.index_value,
                        dtypes=op.dtypes,
                    )
                new_chunks.append(chk)
            chunks = new_chunks

        assert len(chunks) < combine_size

        commit_table_op = DataFrameWriteTableCommit(
            dtypes=op.dtypes,
            table_name=op.table_name,
            blocks=blocks,
            cupid_handle=to_str(upload_handle),
            overwrite=op.overwrite,
            odps_params=op.odps_params,
            is_terminal=True,
        )
        commit_table_chunk = commit_table_op.new_chunk(
            chunks,
            shape=out_chunk_shape,
            dtypes=op.dtypes,
            index_value=out_df.index_value,
            index=(0,) * len(out_chunk_shape),
        )

        new_op = op.copy()
        return new_op.new_dataframes(
            op.inputs,
            shape=out_df.shape,
            index_value=out_df.index_value,
            dtypes=out_df.dtypes,
            columns_value=out_df.columns_value,
            chunks=[commit_table_chunk],
            nsplits=((0,),) * len(out_chunk_shape),
        )