def _init_session()

in tzrec/datasets/odps_dataset.py [0:0]


    def _init_session(self) -> None:
        """Init table scan session."""
        sess_id_to_cli = {}
        for input_path in self._input_path.split(","):
            session_ids = []
            _, table_name, partitions = _parse_table_path(input_path)
            client = self._table_to_cli[table_name]
            if self._is_orderby_partition and partitions is not None:
                splited_partitions = [[x] for x in partitions]
            else:
                splited_partitions = [partitions]
            for partitions in splited_partitions:
                if int(os.environ.get("RANK", 0)) == 0:
                    scan_req = TableBatchScanRequest(
                        split_options=SplitOptions(split_mode="RowOffset"),
                        required_data_columns=self._ordered_cols,
                        required_partitions=partitions,
                    )
                    scan_resp = client.create_read_session(scan_req)
                    session_ids.append(scan_resp.session_id)
                    sess_id_to_cli[scan_resp.session_id] = client
                else:
                    session_ids.append(None)

            if dist.is_initialized():
                dist.broadcast_object_list(session_ids)
            self._input_to_sess[input_path] = [
                SessionRequest(session_id=x) for x in session_ids
            ]
        # refresh session
        if int(os.environ.get("RANK", 0)) == 0:
            t = threading.Thread(
                target=_refresh_sessions_daemon,
                args=(sess_id_to_cli,),
                daemon=True,
            )
            t.start()