in src/buffered.rs [375:421]
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, Error>> {
let cap = self.capacity;
let max_concurrency = self.max_concurrency;
loop {
return match &mut self.state {
BufWriterState::Write(Some(write)) => {
ready!(write.poll_for_capacity(cx, max_concurrency))?;
write.write(buf);
Poll::Ready(Ok(buf.len()))
}
BufWriterState::Write(None) | BufWriterState::Flush(_) => {
panic!("Already shut down")
}
BufWriterState::Prepare(f) => {
self.state = BufWriterState::Write(ready!(f.poll_unpin(cx)?).into());
continue;
}
BufWriterState::Buffer(path, b) => {
if b.content_length().saturating_add(buf.len()) >= cap {
let buffer = std::mem::take(b);
let path = std::mem::take(path);
let opts = PutMultipartOpts {
attributes: self.attributes.take().unwrap_or_default(),
tags: self.tags.take().unwrap_or_default(),
extensions: self.extensions.take().unwrap_or_default(),
};
let store = Arc::clone(&self.store);
self.state = BufWriterState::Prepare(Box::pin(async move {
let upload = store.put_multipart_opts(&path, opts).await?;
let mut chunked = WriteMultipart::new_with_chunk_size(upload, cap);
for chunk in buffer.freeze() {
chunked.put(chunk);
}
Ok(chunked)
}));
continue;
}
b.extend_from_slice(buf);
Poll::Ready(Ok(buf.len()))
}
};
}
}