in core/src/layers/complete.rs [208:262]
fn complete_blocking_reader(
&self,
path: &str,
args: OpRead,
) -> Result<(RpRead, CompleteReader<A, A::BlockingReader>)> {
let capability = self.meta.capability();
if !capability.read || !capability.blocking {
return new_capability_unsupported_error(Operation::BlockingRead);
}
let seekable = capability.read_can_seek;
let streamable = capability.read_can_next;
let range = args.range();
let (rp, r) = self.inner.blocking_read(path, args)?;
let content_length = rp.metadata().content_length();
match (seekable, streamable) {
(true, true) => Ok((rp, CompleteReader::AlreadyComplete(r))),
(true, false) => {
let r = oio::into_streamable_read(r, 256 * 1024);
Ok((rp, CompleteReader::NeedStreamable(r)))
}
_ => {
let (offset, size) = match (range.offset(), range.size()) {
(Some(offset), _) => (offset, content_length),
(None, None) => (0, content_length),
(None, Some(size)) => {
// TODO: we can read content range to calculate
// the total content length.
let om = self
.inner
.blocking_stat(path, OpStat::new())?
.into_metadata();
let total_size = om.content_length();
let (offset, size) = if size > total_size {
(0, total_size)
} else {
(total_size - size, size)
};
(offset, size)
}
};
let r = oio::into_seekable_read_by_range(self.inner.clone(), path, r, offset, size);
if streamable {
Ok((rp, CompleteReader::NeedSeekable(r)))
} else {
let r = oio::into_streamable_read(r, 256 * 1024);
Ok((rp, CompleteReader::NeedBoth(r)))
}
}
}
}