core/src/layers/retry.rs (115 lines of code) (raw):

// Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. use std::fmt::Debug; use std::fmt::Formatter; use std::sync::Arc; use std::time::Duration; use backon::BlockingRetryable; use backon::ExponentialBuilder; use backon::Retryable; use log::warn; use crate::raw::*; use crate::*; /// Add retry for temporary failed operations. /// /// # Notes /// /// This layer will retry failed operations when [`Error::is_temporary`] /// returns true. If operation still failed, this layer will set error to /// `Persistent` which means error has been retried. /// /// # Panics /// /// While retrying `Reader` or `Writer` operations, please make sure either: /// /// - All futures generated by `Reader::read` or `Writer::close` are resolved to `Ready`. /// - Or, won't call any `Reader` or `Writer` methods after retry returns final error. /// /// Otherwise, `RetryLayer` could panic while hitting in bad states. /// /// For example, while composing `RetryLayer` with `TimeoutLayer`. The order of layer is sensitive. /// /// ```no_run /// # use std::time::Duration; /// /// # use opendal::layers::RetryLayer; /// # use opendal::layers::TimeoutLayer; /// # use opendal::services; /// # use opendal::Operator; /// # use opendal::Result; /// /// # fn main() -> Result<()> { /// let op = Operator::new(services::Memory::default())? /// // This is fine, since timeout happen during retry. /// .layer(TimeoutLayer::new().with_io_timeout(Duration::from_nanos(1))) /// .layer(RetryLayer::new()) /// // This is wrong. Since timeout layer will drop future, leaving retry layer in a bad state. /// .layer(TimeoutLayer::new().with_io_timeout(Duration::from_nanos(1))) /// .finish(); /// Ok(()) /// # } /// ``` /// /// # Examples /// /// ```no_run /// # use opendal::layers::RetryLayer; /// # use opendal::services; /// # use opendal::Operator; /// # use opendal::Result; /// # use opendal::Scheme; /// /// # fn main() -> Result<()> { /// let _ = Operator::new(services::Memory::default())? /// .layer(RetryLayer::new()) /// .finish(); /// Ok(()) /// # } /// ``` /// /// ## Customize retry interceptor /// /// RetryLayer accepts [`RetryInterceptor`] to allow users to customize /// their own retry interceptor logic. /// /// ```no_run /// # use std::time::Duration; /// /// # use opendal::layers::RetryInterceptor; /// # use opendal::layers::RetryLayer; /// # use opendal::services; /// # use opendal::Error; /// # use opendal::Operator; /// # use opendal::Result; /// # use opendal::Scheme; /// /// struct MyRetryInterceptor; /// /// impl RetryInterceptor for MyRetryInterceptor { /// fn intercept(&self, err: &Error, dur: Duration) { /// // do something /// } /// } /// /// # fn main() -> Result<()> { /// let _ = Operator::new(services::Memory::default())? /// .layer(RetryLayer::new().with_notify(MyRetryInterceptor)) /// .finish(); /// Ok(()) /// # } /// ``` pub struct RetryLayer<I: RetryInterceptor = DefaultRetryInterceptor> { builder: ExponentialBuilder, notify: Arc<I>, } impl<I: RetryInterceptor> Clone for RetryLayer<I> { fn clone(&self) -> Self { Self { builder: self.builder, notify: self.notify.clone(), } } } impl Default for RetryLayer { fn default() -> Self { Self { builder: ExponentialBuilder::default(), notify: Arc::new(DefaultRetryInterceptor), } } } impl RetryLayer { /// Create a new retry layer. /// # Examples /// /// ```no_run /// use anyhow::Result; /// use opendal::layers::RetryLayer; /// use opendal::services; /// use opendal::Operator; /// use opendal::Scheme; /// /// let _ = Operator::new(services::Memory::default()) /// .expect("must init") /// .layer(RetryLayer::new()); /// ``` pub fn new() -> RetryLayer { Self::default() } } impl<I: RetryInterceptor> RetryLayer<I> { /// Set the retry interceptor as new notify. /// /// ```no_run /// use opendal::layers::RetryLayer; /// use opendal::services; /// use opendal::Operator; /// /// fn notify(_err: &opendal::Error, _dur: std::time::Duration) {} /// /// let _ = Operator::new(services::Memory::default()) /// .expect("must init") /// .layer(RetryLayer::new().with_notify(notify)) /// .finish(); /// ``` pub fn with_notify<NI: RetryInterceptor>(self, notify: NI) -> RetryLayer<NI> { RetryLayer { builder: self.builder, notify: Arc::new(notify), } } /// Set jitter of current backoff. /// /// If jitter is enabled, ExponentialBackoff will add a random jitter in `[0, min_delay) /// to current delay. pub fn with_jitter(mut self) -> Self { self.builder = self.builder.with_jitter(); self } /// Set factor of current backoff. /// /// # Panics /// /// This function will panic if input factor smaller than `1.0`. pub fn with_factor(mut self, factor: f32) -> Self { self.builder = self.builder.with_factor(factor); self } /// Set min_delay of current backoff. pub fn with_min_delay(mut self, min_delay: Duration) -> Self { self.builder = self.builder.with_min_delay(min_delay); self } /// Set max_delay of current backoff. /// /// Delay will not increase if current delay is larger than max_delay. pub fn with_max_delay(mut self, max_delay: Duration) -> Self { self.builder = self.builder.with_max_delay(max_delay); self } /// Set max_times of current backoff. /// /// Backoff will return `None` if max times is reaching. pub fn with_max_times(mut self, max_times: usize) -> Self { self.builder = self.builder.with_max_times(max_times); self } } impl<A: Access, I: RetryInterceptor> Layer<A> for RetryLayer<I> { type LayeredAccess = RetryAccessor<A, I>; fn layer(&self, inner: A) -> Self::LayeredAccess { RetryAccessor { inner: Arc::new(inner), builder: self.builder, notify: self.notify.clone(), } } } /// RetryInterceptor is used to intercept while retry happened. pub trait RetryInterceptor: Send + Sync + 'static { /// Everytime RetryLayer is retrying, this function will be called. /// /// # Timing /// /// just before the retry sleep. /// /// # Inputs /// /// - err: The error that caused the current retry. /// - dur: The duration that will sleep before next retry. /// /// # Notes /// /// The intercept must be quick and non-blocking. No heavy IO is /// allowed. Otherwise, the retry will be blocked. fn intercept(&self, err: &Error, dur: Duration); } impl<F> RetryInterceptor for F where F: Fn(&Error, Duration) + Send + Sync + 'static, { fn intercept(&self, err: &Error, dur: Duration) { self(err, dur); } } /// The DefaultRetryInterceptor will log the retry error in warning level. pub struct DefaultRetryInterceptor; impl RetryInterceptor for DefaultRetryInterceptor { fn intercept(&self, err: &Error, dur: Duration) { warn!( target: "opendal::layers::retry", "will retry after {}s because: {}", dur.as_secs_f64(), err) } } pub struct RetryAccessor<A: Access, I: RetryInterceptor> { inner: Arc<A>, builder: ExponentialBuilder, notify: Arc<I>, } impl<A: Access, I: RetryInterceptor> Debug for RetryAccessor<A, I> { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("RetryAccessor") .field("inner", &self.inner) .finish_non_exhaustive() } } impl<A: Access, I: RetryInterceptor> LayeredAccess for RetryAccessor<A, I> { type Inner = A; type Reader = RetryWrapper<RetryReader<A, A::Reader>, I>; type BlockingReader = RetryWrapper<RetryReader<A, A::BlockingReader>, I>; type Writer = RetryWrapper<A::Writer, I>; type BlockingWriter = RetryWrapper<A::BlockingWriter, I>; type Lister = RetryWrapper<A::Lister, I>; type BlockingLister = RetryWrapper<A::BlockingLister, I>; type Deleter = RetryWrapper<A::Deleter, I>; type BlockingDeleter = RetryWrapper<A::BlockingDeleter, I>; fn inner(&self) -> &Self::Inner { &self.inner } async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> { { || self.inner.create_dir(path, args.clone()) } .retry(self.builder) .when(|e| e.is_temporary()) .notify(|err, dur: Duration| self.notify.intercept(err, dur)) .await .map_err(|e| e.set_persistent()) } async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { let (rp, reader) = { || self.inner.read(path, args.clone()) } .retry(self.builder) .when(|e| e.is_temporary()) .notify(|err, dur| self.notify.intercept(err, dur)) .await .map_err(|e| e.set_persistent())?; let retry_reader = RetryReader::new(self.inner.clone(), path.to_string(), args, reader); let retry_wrapper = RetryWrapper::new(retry_reader, self.notify.clone(), self.builder); Ok((rp, retry_wrapper)) } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { { || self.inner.write(path, args.clone()) } .retry(self.builder) .when(|e| e.is_temporary()) .notify(|err, dur| self.notify.intercept(err, dur)) .await .map(|(rp, r)| (rp, RetryWrapper::new(r, self.notify.clone(), self.builder))) .map_err(|e| e.set_persistent()) } async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> { { || self.inner.stat(path, args.clone()) } .retry(self.builder) .when(|e| e.is_temporary()) .notify(|err, dur| self.notify.intercept(err, dur)) .await .map_err(|e| e.set_persistent()) } async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> { { || self.inner.delete() } .retry(self.builder) .when(|e| e.is_temporary()) .notify(|err, dur| self.notify.intercept(err, dur)) .await .map(|(rp, r)| (rp, RetryWrapper::new(r, self.notify.clone(), self.builder))) .map_err(|e| e.set_persistent()) } async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> { { || self.inner.copy(from, to, args.clone()) } .retry(self.builder) .when(|e| e.is_temporary()) .notify(|err, dur| self.notify.intercept(err, dur)) .await .map_err(|e| e.set_persistent()) } async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> { { || self.inner.rename(from, to, args.clone()) } .retry(self.builder) .when(|e| e.is_temporary()) .notify(|err, dur| self.notify.intercept(err, dur)) .await .map_err(|e| e.set_persistent()) } async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { { || self.inner.list(path, args.clone()) } .retry(self.builder) .when(|e| e.is_temporary()) .notify(|err, dur| self.notify.intercept(err, dur)) .await .map(|(rp, r)| (rp, RetryWrapper::new(r, self.notify.clone(), self.builder))) .map_err(|e| e.set_persistent()) } fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> { { || self.inner.blocking_create_dir(path, args.clone()) } .retry(self.builder) .when(|e| e.is_temporary()) .notify(|err, dur| self.notify.intercept(err, dur)) .call() .map_err(|e| e.set_persistent()) } fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { let (rp, reader) = { || self.inner.blocking_read(path, args.clone()) } .retry(self.builder) .when(|e| e.is_temporary()) .notify(|err, dur| self.notify.intercept(err, dur)) .call() .map_err(|e| e.set_persistent())?; let retry_reader = RetryReader::new(self.inner.clone(), path.to_string(), args, reader); let retry_wrapper = RetryWrapper::new(retry_reader, self.notify.clone(), self.builder); Ok((rp, retry_wrapper)) } fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { { || self.inner.blocking_write(path, args.clone()) } .retry(self.builder) .when(|e| e.is_temporary()) .notify(|err, dur| self.notify.intercept(err, dur)) .call() .map(|(rp, r)| (rp, RetryWrapper::new(r, self.notify.clone(), self.builder))) .map_err(|e| e.set_persistent()) } fn blocking_stat(&self, path: &str, args: OpStat) -> Result<RpStat> { { || self.inner.blocking_stat(path, args.clone()) } .retry(self.builder) .when(|e| e.is_temporary()) .notify(|err, dur| self.notify.intercept(err, dur)) .call() .map_err(|e| e.set_persistent()) } fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> { { || self.inner.blocking_delete() } .retry(self.builder) .when(|e| e.is_temporary()) .notify(|err, dur| self.notify.intercept(err, dur)) .call() .map(|(rp, r)| (rp, RetryWrapper::new(r, self.notify.clone(), self.builder))) .map_err(|e| e.set_persistent()) } fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> { { || self.inner.blocking_copy(from, to, args.clone()) } .retry(self.builder) .when(|e| e.is_temporary()) .notify(|err, dur| self.notify.intercept(err, dur)) .call() .map_err(|e| e.set_persistent()) } fn blocking_rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> { { || self.inner.blocking_rename(from, to, args.clone()) } .retry(self.builder) .when(|e| e.is_temporary()) .notify(|err, dur| self.notify.intercept(err, dur)) .call() .map_err(|e| e.set_persistent()) } fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> { { || self.inner.blocking_list(path, args.clone()) } .retry(self.builder) .when(|e| e.is_temporary()) .notify(|err, dur| self.notify.intercept(err, dur)) .call() .map(|(rp, p)| { let p = RetryWrapper::new(p, self.notify.clone(), self.builder); (rp, p) }) .map_err(|e| e.set_persistent()) } } pub struct RetryReader<A, R> { inner: Arc<A>, reader: Option<R>, path: String, args: OpRead, } impl<A, R> RetryReader<A, R> { fn new(inner: Arc<A>, path: String, args: OpRead, r: R) -> Self { Self { inner, reader: Some(r), path, args, } } } impl<A: Access> oio::Read for RetryReader<A, A::Reader> { async fn read(&mut self) -> Result<Buffer> { loop { match self.reader.take() { None => { let (_, r) = self.inner.read(&self.path, self.args.clone()).await?; self.reader = Some(r); continue; } Some(mut reader) => { let buf = reader.read().await?; self.reader = Some(reader); self.args.range_mut().advance(buf.len() as u64); return Ok(buf); } } } } } impl<A: Access> oio::BlockingRead for RetryReader<A, A::BlockingReader> { fn read(&mut self) -> Result<Buffer> { loop { match self.reader.take() { None => { let (_, r) = self.inner.blocking_read(&self.path, self.args.clone())?; self.reader = Some(r); continue; } Some(mut reader) => { let buf = reader.read()?; self.reader = Some(reader); self.args.range_mut().advance(buf.len() as u64); return Ok(buf); } } } } } pub struct RetryWrapper<R, I> { inner: Option<R>, notify: Arc<I>, builder: ExponentialBuilder, } impl<R, I> RetryWrapper<R, I> { fn new(inner: R, notify: Arc<I>, backoff: ExponentialBuilder) -> Self { Self { inner: Some(inner), notify, builder: backoff, } } fn take_inner(&mut self) -> Result<R> { self.inner.take().ok_or_else(|| { Error::new( ErrorKind::Unexpected, "retry layer is in bad state, please make sure future not dropped before ready", ) }) } } impl<R: oio::Read, I: RetryInterceptor> oio::Read for RetryWrapper<R, I> { async fn read(&mut self) -> Result<Buffer> { use backon::RetryableWithContext; let inner = self.take_inner()?; let (inner, res) = { |mut r: R| async move { let res = r.read().await; (r, res) } } .retry(self.builder) .when(|e| e.is_temporary()) .context(inner) .notify(|err, dur| self.notify.intercept(err, dur)) .await; self.inner = Some(inner); res.map_err(|err| err.set_persistent()) } } impl<R: oio::BlockingRead, I: RetryInterceptor> oio::BlockingRead for RetryWrapper<R, I> { fn read(&mut self) -> Result<Buffer> { use backon::BlockingRetryableWithContext; let inner = self.take_inner()?; let (inner, res) = { |mut r: R| { let res = r.read(); (r, res) } } .retry(self.builder) .when(|e| e.is_temporary()) .context(inner) .notify(|err, dur| self.notify.intercept(err, dur)) .call(); self.inner = Some(inner); res.map_err(|err| err.set_persistent()) } } impl<R: oio::Write, I: RetryInterceptor> oio::Write for RetryWrapper<R, I> { async fn write(&mut self, bs: Buffer) -> Result<()> { use backon::RetryableWithContext; let inner = self.take_inner()?; let ((inner, _), res) = { |(mut r, bs): (R, Buffer)| async move { let res = r.write(bs.clone()).await; ((r, bs), res) } } .retry(self.builder) .when(|e| e.is_temporary()) .context((inner, bs)) .notify(|err, dur| self.notify.intercept(err, dur)) .await; self.inner = Some(inner); res.map_err(|err| err.set_persistent()) } async fn abort(&mut self) -> Result<()> { use backon::RetryableWithContext; let inner = self.take_inner()?; let (inner, res) = { |mut r: R| async move { let res = r.abort().await; (r, res) } } .retry(self.builder) .when(|e| e.is_temporary()) .context(inner) .notify(|err, dur| self.notify.intercept(err, dur)) .await; self.inner = Some(inner); res.map_err(|err| err.set_persistent()) } async fn close(&mut self) -> Result<Metadata> { use backon::RetryableWithContext; let inner = self.take_inner()?; let (inner, res) = { |mut r: R| async move { let res = r.close().await; (r, res) } } .retry(self.builder) .when(|e| e.is_temporary()) .context(inner) .notify(|err, dur| self.notify.intercept(err, dur)) .await; self.inner = Some(inner); res.map_err(|err| err.set_persistent()) } } impl<R: oio::BlockingWrite, I: RetryInterceptor> oio::BlockingWrite for RetryWrapper<R, I> { fn write(&mut self, bs: Buffer) -> Result<()> { { || self.inner.as_mut().unwrap().write(bs.clone()) } .retry(self.builder) .when(|e| e.is_temporary()) .notify(|err, dur| { self.notify.intercept(err, dur); }) .call() .map_err(|e| e.set_persistent()) } fn close(&mut self) -> Result<Metadata> { { || self.inner.as_mut().unwrap().close() } .retry(self.builder) .when(|e| e.is_temporary()) .notify(|err, dur| { self.notify.intercept(err, dur); }) .call() .map_err(|e| e.set_persistent()) } } impl<P: oio::List, I: RetryInterceptor> oio::List for RetryWrapper<P, I> { async fn next(&mut self) -> Result<Option<oio::Entry>> { use backon::RetryableWithContext; let inner = self.take_inner()?; let (inner, res) = { |mut p: P| async move { let res = p.next().await; (p, res) } } .retry(self.builder) .when(|e| e.is_temporary()) .context(inner) .notify(|err, dur| self.notify.intercept(err, dur)) .await; self.inner = Some(inner); res.map_err(|err| err.set_persistent()) } } impl<P: oio::BlockingList, I: RetryInterceptor> oio::BlockingList for RetryWrapper<P, I> { fn next(&mut self) -> Result<Option<oio::Entry>> { { || self.inner.as_mut().unwrap().next() } .retry(self.builder) .when(|e| e.is_temporary()) .notify(|err, dur| { self.notify.intercept(err, dur); }) .call() .map_err(|e| e.set_persistent()) } } impl<P: oio::Delete, I: RetryInterceptor> oio::Delete for RetryWrapper<P, I> { fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> { { || self.inner.as_mut().unwrap().delete(path, args.clone()) } .retry(self.builder) .when(|e| e.is_temporary()) .notify(|err, dur| { self.notify.intercept(err, dur); }) .call() .map_err(|e| e.set_persistent()) } async fn flush(&mut self) -> Result<usize> { use backon::RetryableWithContext; let inner = self.take_inner()?; let (inner, res) = { |mut p: P| async move { let res = p.flush().await; (p, res) } } .retry(self.builder) .when(|e| e.is_temporary()) .context(inner) .notify(|err, dur| self.notify.intercept(err, dur)) .await; self.inner = Some(inner); res.map_err(|err| err.set_persistent()) } } impl<P: oio::BlockingDelete, I: RetryInterceptor> oio::BlockingDelete for RetryWrapper<P, I> { fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> { { || self.inner.as_mut().unwrap().delete(path, args.clone()) } .retry(self.builder) .when(|e| e.is_temporary()) .notify(|err, dur| { self.notify.intercept(err, dur); }) .call() .map_err(|e| e.set_persistent()) } fn flush(&mut self) -> Result<usize> { { || self.inner.as_mut().unwrap().flush() } .retry(self.builder) .when(|e| e.is_temporary()) .notify(|err, dur| { self.notify.intercept(err, dur); }) .call() .map_err(|e| e.set_persistent()) } } #[cfg(test)] mod tests { use std::mem; use std::sync::Arc; use std::sync::Mutex; use bytes::Bytes; use futures::{stream, TryStreamExt}; use tracing_subscriber::filter::LevelFilter; use super::*; use crate::layers::LoggingLayer; #[derive(Default, Clone)] struct MockBuilder { attempt: Arc<Mutex<usize>>, } impl Builder for MockBuilder { const SCHEME: Scheme = Scheme::Custom("mock"); type Config = (); fn build(self) -> Result<impl Access> { Ok(MockService { attempt: self.attempt.clone(), }) } } #[derive(Debug, Clone, Default)] struct MockService { attempt: Arc<Mutex<usize>>, } impl Access for MockService { type Reader = MockReader; type Writer = MockWriter; type Lister = MockLister; type Deleter = MockDeleter; type BlockingReader = (); type BlockingWriter = (); type BlockingLister = (); type BlockingDeleter = (); fn info(&self) -> Arc<AccessorInfo> { let am = AccessorInfo::default(); am.set_scheme(Scheme::Custom("mock")) .set_native_capability(Capability { read: true, write: true, write_can_multi: true, delete: true, delete_max_size: Some(10), stat: true, list: true, list_with_recursive: true, ..Default::default() }); am.into() } async fn stat(&self, _: &str, _: OpStat) -> Result<RpStat> { Ok(RpStat::new( Metadata::new(EntryMode::FILE).with_content_length(13), )) } async fn read(&self, _: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { Ok(( RpRead::new(), MockReader { buf: Bytes::from("Hello, World!").into(), range: args.range(), attempt: self.attempt.clone(), }, )) } async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> { Ok(( RpDelete::default(), MockDeleter { size: 0, attempt: self.attempt.clone(), }, )) } async fn write(&self, _: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> { Ok((RpWrite::new(), MockWriter {})) } async fn list(&self, _: &str, _: OpList) -> Result<(RpList, Self::Lister)> { let lister = MockLister::default(); Ok((RpList::default(), lister)) } } #[derive(Debug, Clone, Default)] struct MockReader { buf: Buffer, range: BytesRange, attempt: Arc<Mutex<usize>>, } impl oio::Read for MockReader { async fn read(&mut self) -> Result<Buffer> { let mut attempt = self.attempt.lock().unwrap(); *attempt += 1; match *attempt { 1 => Err( Error::new(ErrorKind::Unexpected, "retryable_error from reader") .set_temporary(), ), 2 => Err( Error::new(ErrorKind::Unexpected, "retryable_error from reader") .set_temporary(), ), // Should read out all data. 3 => Ok(self.buf.slice(self.range.to_range_as_usize())), 4 => Err( Error::new(ErrorKind::Unexpected, "retryable_error from reader") .set_temporary(), ), // Should be empty. 5 => Ok(self.buf.slice(self.range.to_range_as_usize())), _ => unreachable!(), } } } #[derive(Debug, Clone, Default)] struct MockWriter {} impl oio::Write for MockWriter { async fn write(&mut self, _: Buffer) -> Result<()> { Ok(()) } async fn close(&mut self) -> Result<Metadata> { Err(Error::new(ErrorKind::Unexpected, "always close failed").set_temporary()) } async fn abort(&mut self) -> Result<()> { Ok(()) } } #[derive(Debug, Clone, Default)] struct MockLister { attempt: usize, } impl oio::List for MockLister { async fn next(&mut self) -> Result<Option<oio::Entry>> { self.attempt += 1; match self.attempt { 1 => Err(Error::new( ErrorKind::RateLimited, "retryable rate limited error from lister", ) .set_temporary()), 2 => Ok(Some(oio::Entry::new( "hello", Metadata::new(EntryMode::FILE), ))), 3 => Ok(Some(oio::Entry::new( "world", Metadata::new(EntryMode::FILE), ))), 4 => Err( Error::new(ErrorKind::Unexpected, "retryable internal server error") .set_temporary(), ), 5 => Ok(Some(oio::Entry::new( "2023/", Metadata::new(EntryMode::DIR), ))), 6 => Ok(Some(oio::Entry::new( "0208/", Metadata::new(EntryMode::DIR), ))), 7 => Ok(None), _ => { unreachable!() } } } } #[derive(Debug, Clone, Default)] struct MockDeleter { size: usize, attempt: Arc<Mutex<usize>>, } impl oio::Delete for MockDeleter { fn delete(&mut self, _: &str, _: OpDelete) -> Result<()> { self.size += 1; Ok(()) } async fn flush(&mut self) -> Result<usize> { let mut attempt = self.attempt.lock().unwrap(); *attempt += 1; match *attempt { 1 => Err( Error::new(ErrorKind::Unexpected, "retryable_error from deleter") .set_temporary(), ), 2 => { self.size -= 1; Ok(1) } 3 => Err( Error::new(ErrorKind::Unexpected, "retryable_error from deleter") .set_temporary(), ), 4 => Err( Error::new(ErrorKind::Unexpected, "retryable_error from deleter") .set_temporary(), ), 5 => { let s = mem::take(&mut self.size); Ok(s) } _ => unreachable!(), } } } #[tokio::test] async fn test_retry_read() { let _ = tracing_subscriber::fmt() .with_max_level(LevelFilter::TRACE) .with_test_writer() .try_init(); let builder = MockBuilder::default(); let op = Operator::new(builder.clone()) .unwrap() .layer(LoggingLayer::default()) .layer(RetryLayer::new()) .finish(); let r = op.reader("retryable_error").await.unwrap(); let mut content = Vec::new(); let size = r .read_into(&mut content, ..) .await .expect("read must succeed"); assert_eq!(size, 13); assert_eq!(content, "Hello, World!".as_bytes()); // The error is retryable, we should request it 3 times. assert_eq!(*builder.attempt.lock().unwrap(), 5); } /// This test is used to reproduce the panic issue while composing retry layer with timeout layer. #[tokio::test] async fn test_retry_write_fail_on_close() { let _ = tracing_subscriber::fmt() .with_max_level(LevelFilter::TRACE) .with_test_writer() .try_init(); let builder = MockBuilder::default(); let op = Operator::new(builder.clone()) .unwrap() .layer( RetryLayer::new() .with_min_delay(Duration::from_millis(1)) .with_max_delay(Duration::from_millis(1)) .with_jitter(), ) // Uncomment this to reproduce timeout layer panic. // .layer(TimeoutLayer::new().with_io_timeout(Duration::from_nanos(1))) .layer(LoggingLayer::default()) .finish(); let mut w = op.writer("test_write").await.unwrap(); w.write("aaa").await.unwrap(); w.write("bbb").await.unwrap(); match w.close().await { Ok(_) => (), Err(_) => { w.abort().await.unwrap(); } }; } #[tokio::test] async fn test_retry_list() { let _ = tracing_subscriber::fmt().with_test_writer().try_init(); let builder = MockBuilder::default(); let op = Operator::new(builder.clone()) .unwrap() .layer(RetryLayer::new()) .finish(); let expected = vec!["hello", "world", "2023/", "0208/"]; let mut lister = op .lister("retryable_error/") .await .expect("service must support list"); let mut actual = Vec::new(); while let Some(obj) = lister.try_next().await.expect("must success") { actual.push(obj.name().to_owned()); } assert_eq!(actual, expected); } #[tokio::test] async fn test_retry_batch() { let _ = tracing_subscriber::fmt().with_test_writer().try_init(); let builder = MockBuilder::default(); // set to a lower delay to make it run faster let op = Operator::new(builder.clone()) .unwrap() .layer( RetryLayer::new() .with_min_delay(Duration::from_secs_f32(0.1)) .with_max_times(5), ) .finish(); let paths = vec!["hello", "world", "test", "batch"]; op.delete_stream(stream::iter(paths)).await.unwrap(); assert_eq!(*builder.attempt.lock().unwrap(), 5); } }