in alibabacloud_oss_v2/filelike.py [0:0]
def _prefetch_generator(self, offset):
if not self._executor:
self._executor = ThreadPoolExecutor(self._prefetch_num)
self._close_readers1()
prefetch_num = max(1, self._prefetch_num)
for start in range(offset, self._size_in_bytes, self._chunk_size):
self._prefetch_readers.append(_PrefetchDelegate(self, start))
if len(self._prefetch_readers) < prefetch_num:
continue
# read data from first reader
reader = self._prefetch_readers[0]
curr_iter = iter(reader)
for d in curr_iter:
if reader.failed:
raise ValueError("Meets error, fall back to read serially")
yield d
reader.close()
del self._prefetch_readers[0]
# remians
for reader in self._prefetch_readers:
curr_iter = iter(reader)
for d in curr_iter:
if reader.failed:
raise ValueError("Meets error, fall back to read serially")
yield d
reader.close()
self._prefetch_readers = []