odps/mars_extension/legacy/dataframe/datastore.py (503 lines of code) (raw):
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 1999-2022 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import time
import uuid
import logging
from mars.dataframe.operands import DataFrameOperandMixin, DataFrameOperand
from mars.dataframe.utils import parse_index
from mars.serialize import (
StringField,
SeriesField,
BoolField,
DictField,
Int64Field,
ListField,
ValueType,
)
from mars.dataframe.utils import build_concatenated_rows_frame
from ....utils import to_str
from ...utils import convert_pandas_object_to_string
logger = logging.getLogger(__name__)
try:
from mars.core import OutputType
_output_type_kw = dict(_output_types=[OutputType.dataframe])
except ImportError:
from mars.dataframe.operands import ObjectType
_output_type_kw = dict(_object_type=ObjectType.dataframe)
class DataFrameWriteTable(DataFrameOperand, DataFrameOperandMixin):
_op_type_ = 123460
_dtypes = SeriesField("dtypes")
_odps_params = DictField("odps_params")
_table_name = StringField("table_name")
_partition_spec = StringField("partition_spec")
_partition_columns = ListField("partition_columns", ValueType.string)
_overwrite = BoolField("overwrite")
_write_batch_size = Int64Field("write_batch_size")
_unknown_as_string = BoolField("unknown_as_string")
def __init__(
self,
dtypes=None,
odps_params=None,
table_name=None,
partition_spec=None,
unknown_as_string=None,
overwrite=None,
write_batch_size=None,
**kw
):
kw.update(_output_type_kw)
super(DataFrameWriteTable, self).__init__(
_dtypes=dtypes,
_odps_params=odps_params,
_table_name=table_name,
_partition_spec=partition_spec,
_unknown_as_string=unknown_as_string,
_overwrite=overwrite,
_write_batch_size=write_batch_size,
**kw
)
@property
def retryable(self):
return False
@property
def dtypes(self):
return self._dtypes
@property
def unknown_as_string(self):
return self._unknown_as_string
@property
def odps_params(self):
return self._odps_params
@property
def table_name(self):
return self._table_name
@property
def partition_spec(self):
return self._partition_spec
@property
def overwrite(self):
return self._overwrite
@property
def write_batch_size(self):
return self._write_batch_size
@property
def partition_columns(self):
return self._partition_columns
def __call__(self, x):
shape = (0,) * len(x.shape)
index_value = parse_index(x.index_value.to_pandas()[:0], x.key, "index")
columns_value = parse_index(
x.columns_value.to_pandas()[:0], x.key, "columns", store_data=True
)
return self.new_dataframe(
[x],
shape=shape,
dtypes=x.dtypes[:0],
index_value=index_value,
columns_value=columns_value,
)
@classmethod
def _tile_cupid(cls, op):
from odps import ODPS
from odps.accounts import BearerTokenAccount
from cupid import CupidSession, context
from cupid.runtime import RuntimeContext
if not RuntimeContext.is_context_ready():
raise SystemError(
"No Mars cluster found, please create via `o.create_mars_cluster`."
)
cupid_ctx = context()
bearer_token = cupid_ctx.get_bearer_token()
account = BearerTokenAccount(bearer_token)
project = os.environ.get("ODPS_PROJECT_NAME", None)
odps_params = op.odps_params.copy()
if project:
odps_params["project"] = project
endpoint = os.environ.get("ODPS_RUNTIME_ENDPOINT") or odps_params["endpoint"]
o = ODPS(
None,
None,
account=account,
project=odps_params["project"],
endpoint=endpoint,
)
cupid_session = CupidSession(o)
data_src = o.get_table(op.table_name)
logger.debug("Start creating upload session from cupid.")
upload_session = cupid_session.create_upload_session(data_src)
input_df = build_concatenated_rows_frame(op.inputs[0])
out_df = op.outputs[0]
out_chunks = []
out_chunk_shape = (0,) * len(input_df.shape)
blocks = {}
for chunk in input_df.chunks:
block_id = str(int(time.time())) + "_" + str(uuid.uuid4()).replace("-", "")
chunk_op = DataFrameWriteTableSplit(
dtypes=op.dtypes,
table_name=op.table_name,
unknown_as_string=op.unknown_as_string,
partition_spec=op.partition_spec,
cupid_handle=to_str(upload_session.handle),
block_id=block_id,
write_batch_size=op.write_batch_size,
)
out_chunk = chunk_op.new_chunk(
[chunk],
shape=out_chunk_shape,
index=chunk.index,
index_value=out_df.index_value,
dtypes=chunk.dtypes,
)
out_chunks.append(out_chunk)
blocks[block_id] = op.partition_spec
# build commit tree
combine_size = 8
chunks = out_chunks
while len(chunks) >= combine_size:
new_chunks = []
for i in range(0, len(chunks), combine_size):
chks = chunks[i : i + combine_size]
if len(chks) == 1:
chk = chks[0]
else:
chk_op = DataFrameWriteTableCommit(
dtypes=op.dtypes, is_terminal=False
)
chk = chk_op.new_chunk(
chks,
shape=out_chunk_shape,
index_value=out_df.index_value,
dtypes=op.dtypes,
)
new_chunks.append(chk)
chunks = new_chunks
assert len(chunks) < combine_size
commit_table_op = DataFrameWriteTableCommit(
dtypes=op.dtypes,
table_name=op.table_name,
blocks=blocks,
cupid_handle=to_str(upload_session.handle),
overwrite=op.overwrite,
odps_params=op.odps_params,
is_terminal=True,
)
commit_table_chunk = commit_table_op.new_chunk(
chunks,
shape=out_chunk_shape,
dtypes=op.dtypes,
index_value=out_df.index_value,
)
new_op = op.copy()
return new_op.new_dataframes(
op.inputs,
shape=out_df.shape,
index_value=out_df.index_value,
dtypes=out_df.dtypes,
columns_value=out_df.columns_value,
chunks=[commit_table_chunk],
nsplits=((0,),) * len(out_chunk_shape),
)
@classmethod
def _tile_tunnel(cls, op):
from odps import ODPS
out_df = op.outputs[0]
in_df = build_concatenated_rows_frame(op.inputs[0])
if op.overwrite:
o = ODPS(
op.odps_params["access_id"],
op.odps_params["secret_access_key"],
project=op.odps_params["project"],
endpoint=op.odps_params["endpoint"],
)
data_target = o.get_table(op.table_name)
if op.partition_spec:
data_target = data_target.get_partition(op.partition_spec)
data_target.truncate()
out_chunks = []
for chunk in in_df.chunks:
chunk_op = DataFrameWriteTableSplit(
dtypes=op.dtypes,
table_name=op.table_name,
odps_params=op.odps_params,
partition_spec=op.partition_spec,
)
index_value = parse_index(chunk.index_value.to_pandas()[:0], chunk)
out_chunk = chunk_op.new_chunk(
[chunk],
shape=(0, 0),
index_value=index_value,
columns_value=out_df.columns_value,
dtypes=out_df.dtypes,
index=chunk.index,
)
out_chunks.append(out_chunk)
new_op = op.copy()
params = out_df.params.copy()
params.update(
dict(chunks=out_chunks, nsplits=((0,) * in_df.chunk_shape[0], (0,)))
)
return new_op.new_tileables([in_df], **params)
@classmethod
def tile(cls, op):
from cupid.runtime import RuntimeContext
if RuntimeContext.is_context_ready():
return cls._tile_cupid(op)
else:
return cls._tile_tunnel(op)
class DataFrameWriteTableSplit(DataFrameOperand, DataFrameOperandMixin):
_op_type_ = 123461
_dtypes = SeriesField("dtypes")
_table_name = StringField("table_name")
_partition_spec = StringField("partition_spec")
_cupid_handle = StringField("cupid_handle")
_block_id = StringField("block_id")
_write_batch_size = Int64Field("write_batch_size")
_unknown_as_string = BoolField("unknown_as_string")
# for tunnel
_odps_params = DictField("odps_params")
def __init__(
self,
dtypes=None,
table_name=None,
odps_params=None,
partition_spec=None,
cupid_handle=None,
unknown_as_string=None,
block_id=None,
write_batch_size=None,
**kw
):
kw.update(_output_type_kw)
super(DataFrameWriteTableSplit, self).__init__(
_dtypes=dtypes,
_table_name=table_name,
_odps_params=odps_params,
_partition_spec=partition_spec,
_unknown_as_string=unknown_as_string,
_cupid_handle=cupid_handle,
_block_id=block_id,
_write_batch_size=write_batch_size,
**kw
)
@property
def retryable(self):
return False
@property
def dtypes(self):
return self._dtypes
@property
def table_name(self):
return self._table_name
@property
def odps_params(self):
return self._odps_params
@property
def unknown_as_string(self):
return self._unknown_as_string
@property
def partition_spec(self):
return self._partition_spec
@property
def cupid_handle(self):
return self._cupid_handle
@property
def block_id(self):
return self._block_id
@property
def write_batch_size(self):
return self._write_batch_size
@classmethod
def _execute_in_cupid(cls, ctx, op):
import pyarrow as pa
import pandas as pd
from ....df.backends.pd.types import pd_to_df_schema
from cupid.io.table.core import BlockWriter
to_store_data = ctx[op.inputs[0].key]
odps_schema = pd_to_df_schema(
to_store_data, unknown_as_string=op.unknown_as_string
)
project_name, table_name = op.table_name.split(".")
if table_name.startswith('`') and table_name.endswith('`'):
table_name = table_name[1:-1]
block_writer = BlockWriter(
_table_name=table_name,
_project_name=project_name,
_table_schema=odps_schema,
_partition_spec=op.partition_spec,
_block_id=op.block_id,
_handle=op.cupid_handle,
)
logger.debug("Start writing table block, block id: %s", op.block_id)
with block_writer.open_arrow_writer() as cupid_writer:
sink = pa.BufferOutputStream()
batch_size = op.write_batch_size or 1024
batch_idx = 0
batch_data = to_store_data[
batch_size * batch_idx : batch_size * (batch_idx + 1)
]
batch_data = convert_pandas_object_to_string(batch_data)
schema = pa.RecordBatch.from_pandas(
to_store_data[:1], preserve_index=False
).schema
arrow_writer = pa.RecordBatchStreamWriter(sink, schema)
while len(batch_data) > 0:
batch = pa.RecordBatch.from_pandas(batch_data, preserve_index=False)
arrow_writer.write_batch(batch)
batch_idx += 1
batch_data = to_store_data[
batch_size * batch_idx : batch_size * (batch_idx + 1)
]
arrow_writer.close()
cupid_writer.write(sink.getvalue())
logger.debug("Write table block finished, block id: %s", op.block_id)
block_writer.commit()
ctx[op.outputs[0].key] = pd.DataFrame()
@classmethod
def _execute_arrow_tunnel(cls, ctx, op):
from odps import ODPS
from odps.tunnel import TableTunnel
import pyarrow as pa
import pandas as pd
project = os.environ.get("ODPS_PROJECT_NAME", None)
odps_params = op.odps_params.copy()
if project:
odps_params["project"] = project
endpoint = os.environ.get("ODPS_RUNTIME_ENDPOINT") or odps_params["endpoint"]
o = ODPS(
odps_params["access_id"],
odps_params["secret_access_key"],
project=odps_params["project"],
endpoint=endpoint,
)
t = o.get_table(op.table_name)
schema_name = t.get_schema().name if t.get_schema() is not None else None
tunnel = TableTunnel(o, project=t.project)
if op.partition_spec is not None:
upload_session = tunnel.create_upload_session(
t.name, partition_spec=op.partition_spec
)
else:
upload_session = tunnel.create_upload_session(t.name, schema=schema_name)
logger.debug(
"Start writing table %s split index: %s", op.table_name, op.inputs[0].index
)
writer = upload_session.open_arrow_writer(0)
arrow_rb = pa.RecordBatch.from_pandas(ctx[op.inputs[0].key])
writer.write(arrow_rb)
writer.close()
upload_session.commit([0])
logger.debug(
"Finish writing table %s split index: %s", op.table_name, op.inputs[0].index
)
ctx[op.outputs[0].key] = pd.DataFrame()
@classmethod
def execute(cls, ctx, op):
if op.cupid_handle is not None:
cls._execute_in_cupid(ctx, op)
else:
cls._execute_arrow_tunnel(ctx, op)
class DataFrameWriteTableCommit(DataFrameOperand, DataFrameOperandMixin):
_op_type_ = 123462
_dtypes = SeriesField("dtypes")
_odps_params = DictField("odps_params")
_table_name = StringField("table_name")
_overwrite = BoolField("overwrite")
_blocks = DictField("blocks")
_cupid_handle = StringField("cupid_handle")
_is_terminal = BoolField("is_terminal")
def __init__(
self,
dtypes=None,
odps_params=None,
table_name=None,
blocks=None,
cupid_handle=None,
overwrite=False,
is_terminal=None,
**kw
):
kw.update(_output_type_kw)
super(DataFrameWriteTableCommit, self).__init__(
_dtypes=dtypes,
_odps_params=odps_params,
_table_name=table_name,
_blocks=blocks,
_overwrite=overwrite,
_cupid_handle=cupid_handle,
_is_terminal=is_terminal,
**kw
)
@property
def dtypes(self):
return self._dtypes
@property
def table_name(self):
return self._table_name
@property
def blocks(self):
return self._blocks
@property
def overwrite(self):
return self._overwrite
@property
def cupid_handle(self):
return self._cupid_handle
@property
def odps_params(self):
return self._odps_params
@property
def is_terminal(self):
return self._is_terminal
@classmethod
def execute(cls, ctx, op):
import pandas as pd
from odps import ODPS
from odps.accounts import BearerTokenAccount
from cupid import CupidSession, context
from cupid.io.table import CupidTableUploadSession
if op.is_terminal:
bearer_token = context().get_bearer_token()
account = BearerTokenAccount(bearer_token)
project = os.environ.get("ODPS_PROJECT_NAME", None)
odps_params = op.odps_params.copy()
if project:
odps_params["project"] = project
endpoint = (
os.environ.get("ODPS_RUNTIME_ENDPOINT") or odps_params["endpoint"]
)
o = ODPS(
None,
None,
account=account,
project=odps_params["project"],
endpoint=endpoint,
)
cupid_session = CupidSession(o)
project_name, table_name = op.table_name.split(".")
if table_name.startswith('`') and table_name.endswith('`'):
table_name = table_name[1:-1]
upload_session = CupidTableUploadSession(
session=cupid_session,
table_name=table_name,
project_name=project_name,
handle=op.cupid_handle,
blocks=op.blocks,
)
upload_session.commit(overwrite=op.overwrite)
ctx[op.outputs[0].key] = pd.DataFrame()
def write_odps_table(
df,
table,
partition=None,
overwrite=False,
unknown_as_string=None,
odps_params=None,
write_batch_size=None,
):
op = DataFrameWriteTable(
dtypes=df.dtypes,
odps_params=odps_params,
table_name=table.full_table_name,
unknown_as_string=unknown_as_string,
partition_spec=partition,
overwrite=overwrite,
write_batch_size=write_batch_size,
)
return op(df)