in odps/models/tableio.py [0:0]
def write_table(cls, odps, name, *block_data, **kw):
"""
Write records or pandas DataFrame into given table.
:param name: table or table name
:type name: :class:`.models.table.Table` or str
:param block_data: records / DataFrame, or block ids and records / DataFrame.
If given records or DataFrame only, the block id will be 0 as default.
:param str project: project name, if not provided, will be the default project
:param str schema: schema name, if not provided, will be the default schema
:param partition: the partition of this table to write into
:param list partition_cols: columns representing dynamic partitions
:param bool append_missing_cols: Whether to append missing columns to the target
table. False by default.
:param bool overwrite: if True, will overwrite existing data
:param bool create_table: if true, the table will be created if not exist
:param dict table_kwargs: specify other kwargs for :meth:`~odps.ODPS.create_table`
:param dict type_mapping: specify type mapping for columns when creating tables,
can be dicts like ``{"column": "bigint"}``. If column does not exist in data,
it will be added as an empty column.
:param table_schema_callback: a function to accept table schema resolved from data
and return a new schema for table to create. Only works when target table does
not exist and ``create_table`` is True.
:param int lifecycle: specify table lifecycle when creating tables
:param bool create_partition: if true, the partition will be created if not exist
:param compress_option: the compression algorithm, level and strategy
:type compress_option: :class:`odps.tunnel.CompressOption`
:param str endpoint: tunnel service URL
:param bool reopen: writing the table will reuse the session which opened last time,
if set to True will open a new upload session, default as False
:return: None
:Example:
Write records into a specified table.
>>> odps.write_table('test_table', data)
Write records into multiple blocks.
>>> odps.write_table('test_table', 0, records1, 1, records2)
Write into a given partition.
>>> odps.write_table('test_table', data, partition='pt=test')
Write a pandas DataFrame. Create the table if it does not exist.
>>> import pandas as pd
>>> df = pd.DataFrame([
>>> [111, 'aaa', True],
>>> [222, 'bbb', False],
>>> [333, 'ccc', True],
>>> [444, '中文', False]
>>> ], columns=['num_col', 'str_col', 'bool_col'])
>>> o.write_table('test_table', df, partition='pt=test', create_table=True, create_partition=True)
Passing more arguments when creating table.
>>> import pandas as pd
>>> df = pd.DataFrame([
>>> [111, 'aaa', True],
>>> [222, 'bbb', False],
>>> [333, 'ccc', True],
>>> [444, '中文', False]
>>> ], columns=['num_col', 'str_col', 'bool_col'])
>>> # this dict will be passed to `create_table` as kwargs.
>>> table_kwargs = {"transactional": True, "primary_key": "num_col"}
>>> o.write_table('test_table', df, partition='pt=test', create_table=True, create_partition=True,
>>> table_kwargs=table_kwargs)
Write with dynamic partitioning.
>>> import pandas as pd
>>> df = pd.DataFrame([
>>> [111, 'aaa', True, 'p1'],
>>> [222, 'bbb', False, 'p1'],
>>> [333, 'ccc', True, 'p2'],
>>> [444, '中文', False, 'p2']
>>> ], columns=['num_col', 'str_col', 'bool_col', 'pt'])
>>> o.write_table('test_part_table', df, partition_cols=['pt'], create_partition=True)
:Note:
``write_table`` treats object type of Pandas data as strings as it is often hard to determine their
types when creating a new table for your data. To make sure the column type meet your need, you can
specify `type_mapping` argument to specify the column types, for instance,
``type_mapping={"col1": "array<struct<id:string>>"}``.
.. seealso:: :class:`odps.models.Record`
"""
project = kw.pop("project", None)
schema = kw.pop("schema", None)
append_missing_cols = kw.pop("append_missing_cols", False)
overwrite = kw.pop("overwrite", False)
single_block_types = (Iterable,)
if pa is not None:
single_block_types += (pa.RecordBatch, pa.Table)
if len(block_data) == 1 and isinstance(block_data[0], single_block_types):
blocks = [None]
data_list = list(block_data)
else:
blocks = list(block_data[::2])
data_list = list(block_data[1::2])
if len(blocks) != len(data_list):
raise ValueError(
"Should invoke like odps.write_table(block_id, records, "
"block_id2, records2, ..., **kw)"
)
unknown_as_string = kw.pop("unknown_as_string", False)
create_table = kw.pop("create_table", False)
create_partition = kw.pop(
"create_partition", kw.pop("create_partitions", False)
)
partition = kw.pop("partition", None)
partition_cols = kw.pop("partition_cols", None) or kw.pop("partitions", None)
lifecycle = kw.pop("lifecycle", None)
type_mapping = kw.pop("type_mapping", None)
table_schema_callback = kw.pop("table_schema_callback", None)
table_kwargs = dict(kw.pop("table_kwargs", None) or {})
if lifecycle:
table_kwargs["lifecycle"] = lifecycle
if isinstance(partition_cols, six.string_types):
partition_cols = [partition_cols]
try:
data_sample = data_list[0]
if isinstance(data_sample, GeneratorType):
data_gen = data_sample
data_sample = [next(data_gen)]
data_list[0] = utils.chain_generator([data_sample[0]], data_gen)
table_schema = cls._resolve_schema(
data_sample,
unknown_as_string=unknown_as_string,
partition=partition,
partition_cols=partition_cols,
type_mapping=type_mapping,
)
except TypeError:
table_schema = None
if not odps.exist_table(name, project=project, schema=schema):
if not create_table:
raise errors.NoSuchTable(
"Target table %s not exist. To create a new table "
"you can add an argument `create_table=True`." % name
)
if callable(table_schema_callback):
table_schema = table_schema_callback(table_schema)
target_table = odps.create_table(
name, table_schema, project=project, schema=schema, **table_kwargs
)
else:
target_table = cls._get_table_obj(
odps, name, project=project, schema=schema
)
union_cols, diff_cols = cls._calc_schema_diff(
table_schema, target_table.schema, partition_cols=partition_cols
)
if table_schema and not union_cols:
warnings.warn(
"No columns overlapped between source and target table. If result "
"is not as expected, please check if your query provides correct "
"column names."
)
if diff_cols:
if append_missing_cols:
target_table.add_columns(diff_cols)
else:
warnings.warn(
"Columns in source data %s are missing in target table %s. "
"Specify append_missing_cols=True to append missing columns "
"to the target table."
% (", ".join(c.name for c in diff_cols), target_table.name)
)
partition_cols = cls._check_partition_specified(
name,
target_table.table_schema,
partition_cols=partition_cols,
partition=partition,
)
data_lists = defaultdict(lambda: defaultdict(list))
for block, data in zip(blocks, data_list):
for key, parted_data in cls._split_block_data_in_partitions(
target_table,
data,
partition_cols=partition_cols,
partition=partition,
).items():
data_lists[key][block].extend(parted_data)
if partition is None or isinstance(partition, six.string_types):
partition_str = partition
else:
partition_str = str(odps_types.PartitionSpec(partition))
# fixme cover up for overwrite failure on table.format.version=2:
# only applicable for transactional table with partitions
# with generate expressions
manual_truncate = (
overwrite
and target_table.is_transactional
and any(
pt_col.generate_expression
for pt_col in target_table.table_schema.partitions
)
)
for (is_arrow, pt_name), block_to_data in data_lists.items():
if not block_to_data:
continue
blocks, data_list = [], []
for block, data in block_to_data.items():
blocks.append(block)
data_list.extend(data)
if len(blocks) == 1 and blocks[0] is None:
blocks = None
final_pt = ",".join(p for p in (pt_name, partition_str) if p is not None)
# fixme cover up for overwrite failure on table.format.version=2
if overwrite and manual_truncate:
if not final_pt or target_table.exist_partition(final_pt):
target_table.truncate(partition_spec=final_pt or None)
with target_table.open_writer(
partition=final_pt or None,
blocks=blocks,
arrow=is_arrow,
create_partition=create_partition,
reopen=append_missing_cols,
overwrite=overwrite,
**kw
) as writer:
if blocks is None:
for data in data_list:
writer.write(data)
else:
for block, data in zip(blocks, data_list):
writer.write(block, data)