def write_table()

in odps/models/tableio.py [0:0]


    def write_table(cls, odps, name, *block_data, **kw):
        """
        Write records or pandas DataFrame into given table.

        :param name: table or table name
        :type name: :class:`.models.table.Table` or str
        :param block_data: records / DataFrame, or block ids and records / DataFrame.
            If given records or DataFrame only, the block id will be 0 as default.
        :param str project: project name, if not provided, will be the default project
        :param str schema: schema name, if not provided, will be the default schema
        :param partition: the partition of this table to write into
        :param list partition_cols: columns representing dynamic partitions
        :param bool append_missing_cols: Whether to append missing columns to the target
            table. False by default.
        :param bool overwrite: if True, will overwrite existing data
        :param bool create_table: if true, the table will be created if not exist
        :param dict table_kwargs: specify other kwargs for :meth:`~odps.ODPS.create_table`
        :param dict type_mapping: specify type mapping for columns when creating tables,
            can be dicts like ``{"column": "bigint"}``. If column does not exist in data,
            it will be added as an empty column.
        :param table_schema_callback: a function to accept table schema resolved from data
            and return a new schema for table to create. Only works when target table does
            not exist and ``create_table`` is True.
        :param int lifecycle: specify table lifecycle when creating tables
        :param bool create_partition: if true, the partition will be created if not exist
        :param compress_option: the compression algorithm, level and strategy
        :type compress_option: :class:`odps.tunnel.CompressOption`
        :param str endpoint:  tunnel service URL
        :param bool reopen: writing the table will reuse the session which opened last time,
            if set to True will open a new upload session, default as False
        :return: None

        :Example:

        Write records into a specified table.

        >>> odps.write_table('test_table', data)

        Write records into multiple blocks.

        >>> odps.write_table('test_table', 0, records1, 1, records2)

        Write into a given partition.

        >>> odps.write_table('test_table', data, partition='pt=test')

        Write a pandas DataFrame. Create the table if it does not exist.

        >>> import pandas as pd
        >>> df = pd.DataFrame([
        >>>     [111, 'aaa', True],
        >>>     [222, 'bbb', False],
        >>>     [333, 'ccc', True],
        >>>     [444, '中文', False]
        >>> ], columns=['num_col', 'str_col', 'bool_col'])
        >>> o.write_table('test_table', df, partition='pt=test', create_table=True, create_partition=True)

        Passing more arguments when creating table.

        >>> import pandas as pd
        >>> df = pd.DataFrame([
        >>>     [111, 'aaa', True],
        >>>     [222, 'bbb', False],
        >>>     [333, 'ccc', True],
        >>>     [444, '中文', False]
        >>> ], columns=['num_col', 'str_col', 'bool_col'])
        >>> # this dict will be passed to `create_table` as kwargs.
        >>> table_kwargs = {"transactional": True, "primary_key": "num_col"}
        >>> o.write_table('test_table', df, partition='pt=test', create_table=True, create_partition=True,
        >>>               table_kwargs=table_kwargs)

        Write with dynamic partitioning.

        >>> import pandas as pd
        >>> df = pd.DataFrame([
        >>>     [111, 'aaa', True, 'p1'],
        >>>     [222, 'bbb', False, 'p1'],
        >>>     [333, 'ccc', True, 'p2'],
        >>>     [444, '中文', False, 'p2']
        >>> ], columns=['num_col', 'str_col', 'bool_col', 'pt'])
        >>> o.write_table('test_part_table', df, partition_cols=['pt'], create_partition=True)

        :Note:

        ``write_table`` treats object type of Pandas data as strings as it is often hard to determine their
        types when creating a new table for your data. To make sure the column type meet your need, you can
        specify `type_mapping` argument to specify the column types, for instance,
        ``type_mapping={"col1": "array<struct<id:string>>"}``.

        .. seealso:: :class:`odps.models.Record`
        """
        project = kw.pop("project", None)
        schema = kw.pop("schema", None)
        append_missing_cols = kw.pop("append_missing_cols", False)
        overwrite = kw.pop("overwrite", False)

        single_block_types = (Iterable,)
        if pa is not None:
            single_block_types += (pa.RecordBatch, pa.Table)

        if len(block_data) == 1 and isinstance(block_data[0], single_block_types):
            blocks = [None]
            data_list = list(block_data)
        else:
            blocks = list(block_data[::2])
            data_list = list(block_data[1::2])

            if len(blocks) != len(data_list):
                raise ValueError(
                    "Should invoke like odps.write_table(block_id, records, "
                    "block_id2, records2, ..., **kw)"
                )

        unknown_as_string = kw.pop("unknown_as_string", False)
        create_table = kw.pop("create_table", False)
        create_partition = kw.pop(
            "create_partition", kw.pop("create_partitions", False)
        )
        partition = kw.pop("partition", None)
        partition_cols = kw.pop("partition_cols", None) or kw.pop("partitions", None)
        lifecycle = kw.pop("lifecycle", None)
        type_mapping = kw.pop("type_mapping", None)
        table_schema_callback = kw.pop("table_schema_callback", None)
        table_kwargs = dict(kw.pop("table_kwargs", None) or {})
        if lifecycle:
            table_kwargs["lifecycle"] = lifecycle

        if isinstance(partition_cols, six.string_types):
            partition_cols = [partition_cols]

        try:
            data_sample = data_list[0]
            if isinstance(data_sample, GeneratorType):
                data_gen = data_sample
                data_sample = [next(data_gen)]
                data_list[0] = utils.chain_generator([data_sample[0]], data_gen)
            table_schema = cls._resolve_schema(
                data_sample,
                unknown_as_string=unknown_as_string,
                partition=partition,
                partition_cols=partition_cols,
                type_mapping=type_mapping,
            )
        except TypeError:
            table_schema = None

        if not odps.exist_table(name, project=project, schema=schema):
            if not create_table:
                raise errors.NoSuchTable(
                    "Target table %s not exist. To create a new table "
                    "you can add an argument `create_table=True`." % name
                )
            if callable(table_schema_callback):
                table_schema = table_schema_callback(table_schema)
            target_table = odps.create_table(
                name, table_schema, project=project, schema=schema, **table_kwargs
            )
        else:
            target_table = cls._get_table_obj(
                odps, name, project=project, schema=schema
            )

        union_cols, diff_cols = cls._calc_schema_diff(
            table_schema, target_table.schema, partition_cols=partition_cols
        )
        if table_schema and not union_cols:
            warnings.warn(
                "No columns overlapped between source and target table. If result "
                "is not as expected, please check if your query provides correct "
                "column names."
            )
        if diff_cols:
            if append_missing_cols:
                target_table.add_columns(diff_cols)
            else:
                warnings.warn(
                    "Columns in source data %s are missing in target table %s. "
                    "Specify append_missing_cols=True to append missing columns "
                    "to the target table."
                    % (", ".join(c.name for c in diff_cols), target_table.name)
                )

        partition_cols = cls._check_partition_specified(
            name,
            target_table.table_schema,
            partition_cols=partition_cols,
            partition=partition,
        )

        data_lists = defaultdict(lambda: defaultdict(list))
        for block, data in zip(blocks, data_list):
            for key, parted_data in cls._split_block_data_in_partitions(
                target_table,
                data,
                partition_cols=partition_cols,
                partition=partition,
            ).items():
                data_lists[key][block].extend(parted_data)

        if partition is None or isinstance(partition, six.string_types):
            partition_str = partition
        else:
            partition_str = str(odps_types.PartitionSpec(partition))

        # fixme cover up for overwrite failure on table.format.version=2:
        #  only applicable for transactional table with partitions
        #  with generate expressions
        manual_truncate = (
            overwrite
            and target_table.is_transactional
            and any(
                pt_col.generate_expression
                for pt_col in target_table.table_schema.partitions
            )
        )

        for (is_arrow, pt_name), block_to_data in data_lists.items():
            if not block_to_data:
                continue

            blocks, data_list = [], []
            for block, data in block_to_data.items():
                blocks.append(block)
                data_list.extend(data)

            if len(blocks) == 1 and blocks[0] is None:
                blocks = None

            final_pt = ",".join(p for p in (pt_name, partition_str) if p is not None)
            # fixme cover up for overwrite failure on table.format.version=2
            if overwrite and manual_truncate:
                if not final_pt or target_table.exist_partition(final_pt):
                    target_table.truncate(partition_spec=final_pt or None)
            with target_table.open_writer(
                partition=final_pt or None,
                blocks=blocks,
                arrow=is_arrow,
                create_partition=create_partition,
                reopen=append_missing_cols,
                overwrite=overwrite,
                **kw
            ) as writer:
                if blocks is None:
                    for data in data_list:
                        writer.write(data)
                else:
                    for block, data in zip(blocks, data_list):
                        writer.write(block, data)