def _execute_arrow_tunnel()

in odps/mars_extension/oscar/dataframe/datastore.py [0:0]


    def _execute_arrow_tunnel(cls, ctx, op):
        from odps import ODPS
        from odps.tunnel import TableTunnel
        import pyarrow as pa
        import pandas as pd

        project = os.environ.get("ODPS_PROJECT_NAME", None)
        odps_params = op.odps_params.copy()
        if project:
            odps_params["project"] = project
        endpoint = os.environ.get("ODPS_RUNTIME_ENDPOINT") or odps_params["endpoint"]
        o = ODPS(
            odps_params["access_id"],
            odps_params["secret_access_key"],
            project=odps_params["project"],
            endpoint=endpoint,
        )

        t = o.get_table(op.table_name)
        tunnel = TableTunnel(o, project=t.project, quota_name=op.tunnel_quota_name)
        retry_times = options.retry_times
        init_sleep_secs = 1
        split_index = op.inputs[0].index
        logger.info(
            "Start creating upload session for table %s split index %s retry_times %s.",
            op.table_name,
            split_index,
            retry_times,
        )
        retries = 0
        while True:
            try:
                if op.partition_spec is not None:
                    upload_session = tunnel.create_upload_session(
                        t.name, partition_spec=op.partition_spec
                    )
                else:
                    upload_session = tunnel.create_upload_session(t.name)
                break
            except:
                if retries >= retry_times:
                    raise
                retries += 1
                sleep_secs = retries * init_sleep_secs
                logger.exception(
                    "Create upload session failed, sleep %s seconds and retry it",
                    sleep_secs,
                    exc_info=1,
                )
                time.sleep(sleep_secs)
        logger.info(
            "Start writing table %s. split_index: %s tunnel_session: %s",
            op.table_name,
            split_index,
            upload_session.id,
        )
        retries = 0
        while True:
            try:
                writer = upload_session.open_arrow_writer(0)
                arrow_rb = pa.RecordBatch.from_pandas(ctx[op.inputs[0].key])
                writer.write(arrow_rb)
                writer.close()
                break
            except:
                if retries >= retry_times:
                    raise
                retries += 1
                sleep_secs = retries * init_sleep_secs
                logger.exception(
                    "Write data failed, sleep %s seconds and retry it",
                    sleep_secs,
                    exc_info=1,
                )
                time.sleep(sleep_secs)
        recorder_name = op.commit_recorder_name
        try:
            recorder = ctx.get_remote_object(recorder_name)
        except ActorNotExist:
            while True:
                logger.info(
                    "Writing table %s has been finished, waiting to be canceled by speculative scheduler",
                    op.table_name,
                )
                time.sleep(3)
        can_commit, can_destroy = recorder.try_commit(split_index)
        if can_commit:
            # FIXME If this commit failed or the process crashed, the whole write will still raise error.
            # But this situation is very rare so we skip the error handling.
            logger.info(
                "Committing to table %s with upload session %s", op.table_name, upload_session.id
            )
            upload_session.commit([0])
            logger.info(
                "Finish writing table %s. split_index: %s tunnel_session: %s",
                op.table_name,
                split_index,
                upload_session.id,
            )
        else:
            logger.info(
                "Skip writing table %s. split_index: %s", op.table_name, split_index
            )
        if can_destroy:
            try:
                ctx.destroy_remote_object(recorder_name)
                logger.info("Delete remote object %s", recorder_name)
            except ActorNotExist:
                pass
        logger.info(
            "Committing to table %s with upload session %s", op.table_name, upload_session.id
        )
        upload_session.commit([0])
        logger.info(
            "Finish writing table %s. split_index: %s tunnel_session: %s",
            op.table_name,
            split_index,
            upload_session.id,
        )
        ctx[op.outputs[0].key] = pd.DataFrame()