odps/mars_extension/legacy/core.py [394:502]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    runtime_endpoint=None,
    **kw
):
    """
    Write Mars DataFrame to table.

    :param df: Mars DataFrame.
    :param table_name: table to write.
    :param overwrite: if overwrite the data. False as default.
    :param partition: partition spec.
    :param write_batch_size: batch size of records to write. 1024 as default.
    :param unknown_as_string: set the columns to string type if it's type is Object.
    :param as_type: specify column dtypes. {'a': 'string'} will set column `a` as string type.
    :param drop_table: drop table if exists, False as default
    :param create_table: create table first if not exits, True as default
    :param drop_partition: drop partition if exists, False as default
    :param create_partition: create partition if not exists, None as default
    :param lifecycle: table lifecycle. If absent, `options.lifecycle` will be used.

    :return: None
    """
    from .dataframe import write_odps_table
    from odps.tunnel import TableTunnel

    dtypes = df.dtypes
    odps_types = []
    names = []
    for name, t in zip(dtypes.keys(), list(dtypes.values)):
        names.append(name)
        if as_type and name in as_type:
            odps_types.append(as_type[name])
        else:
            odps_types.append(
                pd_type_to_odps_type(
                    t, name, unknown_as_string=unknown_as_string, project=odps.get_project()
                )
            )
    if partition:
        p = PartitionSpec(partition)
        schema = TableSchema.from_lists(names, odps_types, p.keys, ["string"] * len(p))
    else:
        schema = TableSchema.from_lists(names, odps_types)

    if drop_table:
        odps.delete_table(table_name, if_exists=True)

    if partition is None:
        # the non-partitioned table
        if drop_partition:
            raise ValueError("Cannot drop partition for non-partition table")
        if create_partition:
            raise ValueError("Cannot create partition for non-partition table")

        if create_table or (not odps.exist_table(table_name)):
            odps.create_table(
                table_name,
                schema,
                if_not_exists=True,
                stored_as="aliorc",
                lifecycle=lifecycle,
            )
    else:
        if odps.exist_table(table_name) or not create_table:
            t = odps.get_table(table_name)
            table_partition = t.get_partition(partition)
            if drop_partition:
                t.delete_partition(table_partition, if_exists=True)
            if create_partition:
                t.create_partition(table_partition, if_not_exists=True)

        else:
            odps.create_table(
                table_name, schema, stored_as="aliorc", lifecycle=lifecycle
            )

    table = odps.get_table(table_name)

    if len(table.table_schema.simple_columns) != len(schema.simple_columns):
        raise TypeError(
            "Table column number is %s while input DataFrame has %s columns"
            % (len(table.table_schema.simple_columns), len(schema.simple_columns))
        )

    for c_left, c_right in zip(table.table_schema.simple_columns, schema.simple_columns):
        if c_left.name.lower() != c_right.name.lower() or c_left.type != c_right.type:
            raise TypeError(
                "Column type between provided DataFrame and target table"
                " does not agree with each other. DataFrame column %s type is %s,"
                "target table column %s type is %s"
                % (c_right.name, c_right.type, c_left.name, c_left.type)
            )

    if partition:
        table.create_partition(partition, if_not_exists=True)
    runtime_endpoint = (
        runtime_endpoint
        or kw.pop("cupid_internal_endpoint", None)
        or cupid_options.cupid.runtime.endpoint
        or odps.endpoint
    )
    odps_params = dict(project=odps.project, endpoint=runtime_endpoint)
    if isinstance(odps.account, AliyunAccount):
        odps_params.update(
            dict(
                access_id=odps.account.access_id,
                secret_access_key=odps.account.secret_access_key,
            )
        )
    if isinstance(df, pd.DataFrame):
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



odps/mars_extension/oscar/core.py [394:503]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    runtime_endpoint=None,
    **kw
):
    """
    Write Mars DataFrame to table.

    :param df: Mars DataFrame.
    :param table_name: table to write.
    :param overwrite: if overwrite the data. False as default.
    :param partition: partition spec.
    :param write_batch_size: batch size of records to write. 1024 as default.
    :param unknown_as_string: set the columns to string type if it's type is Object.
    :param as_type: specify column dtypes. {'a': 'string'} will set column `a` as string type.
    :param drop_table: drop table if exists, False as default
    :param create_table: create table first if not exits, True as default
    :param drop_partition: drop partition if exists, False as default
    :param create_partition: create partition if not exists, None as default
    :param lifecycle: table lifecycle. If absent, `options.lifecycle` will be used.
    :param tunnel_quota_name: name of tunnel quota

    :return: None
    """
    import pandas as pd
    from cupid.config import options as cupid_options
    from .dataframe import write_odps_table

    dtypes = df.dtypes
    odps_types = []
    names = []
    for name, t in zip(dtypes.keys(), list(dtypes.values)):
        names.append(name)
        if as_type and name in as_type:
            odps_types.append(as_type[name])
        else:
            odps_types.append(
                pd_type_to_odps_type(
                    t, name, unknown_as_string=unknown_as_string, project=odps.get_project()
                )
            )
    if partition:
        p = PartitionSpec(partition)
        schema = TableSchema.from_lists(names, odps_types, p.keys, ["string"] * len(p))
    else:
        schema = TableSchema.from_lists(names, odps_types)

    if drop_table:
        odps.delete_table(table_name, if_exists=True)

    if partition is None:
        # the non-partitioned table
        if drop_partition:
            raise ValueError("Cannot drop partition for non-partition table")
        if create_partition:
            raise ValueError("Cannot create partition for non-partition table")

        if create_table or (not odps.exist_table(table_name)):
            odps.create_table(
                table_name,
                schema,
                if_not_exists=True,
                stored_as="aliorc",
                lifecycle=lifecycle,
            )
    else:
        if odps.exist_table(table_name) or not create_table:
            t = odps.get_table(table_name)
            table_partition = t.get_partition(partition)
            if drop_partition:
                t.delete_partition(table_partition, if_exists=True)
            if create_partition:
                t.create_partition(table_partition, if_not_exists=True)

        else:
            odps.create_table(
                table_name, schema, stored_as="aliorc", lifecycle=lifecycle
            )

    table = odps.get_table(table_name)

    if len(table.table_schema.simple_columns) != len(schema.simple_columns):
        raise TypeError(
            "Table column number is %s while input DataFrame has %s columns"
            % (len(table.table_schema.simple_columns), len(schema.simple_columns))
        )

    for c_left, c_right in zip(table.table_schema.simple_columns, schema.simple_columns):
        if c_left.name.lower() != c_right.name.lower() or c_left.type != c_right.type:
            raise TypeError(
                "Column type between provided DataFrame and target table"
                " does not agree with each other. DataFrame column %s type is %s,"
                "target table column %s type is %s"
                % (c_right.name, c_right.type, c_left.name, c_left.type)
            )
    if partition:
        table.create_partition(partition, if_not_exists=True)
    runtime_endpoint = (
        runtime_endpoint
        or kw.pop("cupid_internal_endpoint", None)
        or cupid_options.cupid.runtime.endpoint
        or odps.endpoint
    )
    odps_params = dict(project=odps.project, endpoint=runtime_endpoint)
    if isinstance(odps.account, AliyunAccount):
        odps_params.update(
            dict(
                access_id=odps.account.access_id,
                secret_access_key=odps.account.secret_access_key,
            )
        )
    if isinstance(df, pd.DataFrame):
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



