in plasma-stream/src/server/receiver.rs [124:147]
fn add_to_receiving(&self) -> Result<(), ObjectReceiveError> {
// ensure thread-safety by acquiring a lock to the set of objects being received;
// `unwrap()` is OK here because no thread will panic wile holding the lock.
let mut receiving = self.receiving.lock().unwrap();
// if any of the object IDs is already in the store, return an error
let mut duplicates = Vec::new();
for oid in self.object_ids.iter() {
if receiving.contains(oid) {
duplicates.push(*oid);
}
}
if !duplicates.is_empty() {
return Err(ObjectReceiveError::AlreadyReceiving(
self.peer_addr,
duplicates,
));
}
// add all object IDs to the set and return
receiving.extend(self.object_ids.iter());
Ok(())
}