in torchdata/datapipes/iter/util/combining.py [0:0]
def __iter__(self) -> Iterator:
buffer: OrderedDict = OrderedDict()
ref_it = iter(self.ref_datapipe)
warn_once_flag = True
for data in self.source_datapipe:
key = self.key_fn(data)
while key not in buffer:
try:
ref_data = next(ref_it)
except StopIteration:
raise BufferError(
f"No matching key can be found from reference DataPipe for the data {data}. "
"Please consider increasing the buffer size."
)
ref_key = self.ref_key_fn(ref_data)
if ref_key in buffer:
raise ValueError("Duplicate key is found in reference DataPipe")
if self.buffer_size is not None and len(buffer) > self.buffer_size:
if warn_once_flag:
warn_once_flag = False
warnings.warn(
"Buffer reaches the upper limit, so reference key-data pair begins to "
"be removed from buffer in FIFO order. Please consider increase buffer size."
)
buffer.popitem(last=False)
buffer[ref_key] = ref_data
res = self.merge_fn(data, buffer.pop(key)) if self.merge_fn else (data, buffer.pop(key))
if self.keep_key:
yield key, res
else:
yield res