in tzrec/datasets/odps_dataset.py [0:0]
def _init_session(self) -> None:
"""Init table scan session."""
sess_id_to_cli = {}
for input_path in self._input_path.split(","):
session_ids = []
_, table_name, partitions = _parse_table_path(input_path)
client = self._table_to_cli[table_name]
if self._is_orderby_partition and partitions is not None:
splited_partitions = [[x] for x in partitions]
else:
splited_partitions = [partitions]
for partitions in splited_partitions:
if int(os.environ.get("RANK", 0)) == 0:
scan_req = TableBatchScanRequest(
split_options=SplitOptions(split_mode="RowOffset"),
required_data_columns=self._ordered_cols,
required_partitions=partitions,
)
scan_resp = client.create_read_session(scan_req)
session_ids.append(scan_resp.session_id)
sess_id_to_cli[scan_resp.session_id] = client
else:
session_ids.append(None)
if dist.is_initialized():
dist.broadcast_object_list(session_ids)
self._input_to_sess[input_path] = [
SessionRequest(session_id=x) for x in session_ids
]
# refresh session
if int(os.environ.get("RANK", 0)) == 0:
t = threading.Thread(
target=_refresh_sessions_daemon,
args=(sess_id_to_cli,),
daemon=True,
)
t.start()