in src/transaction.rs [44:92]
fn new_transaction<C>(driver: QldbDriver<C>) -> ShellTransaction
where
C: QldbSession + Send + Sync + Clone + 'static,
{
let (input, recv) = channel(1);
let (output, results) = channel(1);
let handle = task::spawn(async move {
let recv = Arc::new(Mutex::new(recv));
let outcome = driver
.transact(|mut tx| async {
loop {
let input = async {
let mut guard = recv.lock().await;
guard.recv().await
};
match input.await {
Some(TransactionRequest::ExecuteStatement(partiql)) => {
let results = tx.execute_statement(partiql).await;
if let Err(_) = output.send(results).await {
panic!("results ch should never be closed");
}
}
Some(TransactionRequest::Commit) => {
break tx.commit(()).await;
}
// The `None` variant is actually unreachable. It
// *would* signify that the input channel was closed,
// however the ch and future are dropped simultaneously
// in the case of cancellation.
Some(TransactionRequest::Abort) | None => {
break tx.abort().await;
}
}
}
})
.await?;
Ok(outcome)
});
ShellTransaction {
input,
results,
handle: Some(handle),
}
}