def _prefetch_generator()

in alibabacloud_oss_v2/filelike.py [0:0]


    def _prefetch_generator(self, offset):
        if not self._executor:
            self._executor = ThreadPoolExecutor(self._prefetch_num)

        self._close_readers1()
        prefetch_num = max(1, self._prefetch_num)

        for start in range(offset, self._size_in_bytes, self._chunk_size):
            self._prefetch_readers.append(_PrefetchDelegate(self, start))
            if len(self._prefetch_readers) < prefetch_num:
                continue

            # read data from first reader
            reader = self._prefetch_readers[0]
            curr_iter = iter(reader)
            for d in curr_iter:
                if reader.failed:
                    raise ValueError("Meets error, fall back to read serially")
                yield d

            reader.close()
            del self._prefetch_readers[0]

        # remians
        for reader in self._prefetch_readers:
            curr_iter = iter(reader)
            for d in curr_iter:
                if reader.failed:
                    raise ValueError("Meets error, fall back to read serially")
                yield d
            reader.close()

        self._prefetch_readers = []