in rust-runtime/aws-smithy-client/src/dvr/replay.rs [219:301]
fn call(&mut self, mut req: Request<SdkBody>) -> Self::Future {
let event_id = self.next_id();
let mut events = match self.live_events.lock().unwrap().remove(&event_id) {
Some(traffic) => traffic,
None => {
return Box::pin(std::future::ready(Err(ConnectorError::other(
format!("no data for event {}. req: {:?}", event_id.0, req).into(),
None,
))))
}
};
let _initial_request = events.pop_front().unwrap();
let (sender, response_body) = hyper::Body::channel();
let body = SdkBody::from(response_body);
let recording = self.recorded_requests.clone();
let recorded_request = tokio::spawn(async move {
let mut data_read = vec![];
while let Some(data) = req.body_mut().data().await {
data_read
.extend_from_slice(data.expect("in memory request should not fail").as_ref())
}
req.map(|_| Bytes::from(data_read))
});
let mut recorded_request = Waitable::Loading(recorded_request);
let fut = async move {
let resp = loop {
let event = events
.pop_front()
.expect("no events, needed a response event");
match event.action {
// to ensure deterministic behavior if the request EOF happens first in the log,
// wait for the request body to be done before returning a response.
Action::Eof {
direction: Direction::Request,
..
} => {
recorded_request.wait().await;
}
Action::Request { .. } => panic!("invalid"),
Action::Response {
response: Err(error),
} => break Err(ConnectorError::other(error.0.into(), None)),
Action::Response {
response: Ok(response),
} => {
let mut builder = http::Response::builder()
.status(response.status)
.version(convert_version(&response.version));
for (name, values) in response.headers {
for value in values {
builder = builder.header(&name, &value);
}
}
tokio::spawn(async move {
replay_body(events, sender).await;
// insert the finalized body into
});
break Ok(builder.body(body).expect("valid builder"));
}
Action::Data {
direction: Direction::Request,
data: _data,
} => {
tracing::info!("get request data");
}
Action::Eof {
direction: Direction::Response,
..
} => panic!("got eof before response"),
Action::Data {
data: _,
direction: Direction::Response,
} => panic!("got response data before response"),
}
};
recording.lock().unwrap().insert(event_id, recorded_request);
resp
};
Box::pin(fut)
}