src/aws/dynamo.rs (396 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//! A DynamoDB based lock system
use std::borrow::Cow;
use std::collections::HashMap;
use std::future::Future;
use std::time::{Duration, Instant};
use chrono::Utc;
use http::{Method, StatusCode};
use serde::ser::SerializeMap;
use serde::{Deserialize, Serialize, Serializer};
use crate::aws::client::S3Client;
use crate::aws::credential::CredentialExt;
use crate::aws::{AwsAuthorizer, AwsCredential};
use crate::client::get::GetClientExt;
use crate::client::retry::RetryExt;
use crate::client::retry::{RequestError, RetryError};
use crate::path::Path;
use crate::{Error, GetOptions, Result};
/// The exception returned by DynamoDB on conflict
const CONFLICT: &str = "ConditionalCheckFailedException";
const STORE: &str = "DynamoDB";
/// A DynamoDB-based commit protocol, used to provide conditional write support for S3
///
/// ## Limitations
///
/// Only conditional operations, e.g. `copy_if_not_exists` will be synchronized, and can
/// therefore race with non-conditional operations, e.g. `put`, `copy`, `delete`, or
/// conditional operations performed by writers not configured to synchronize with DynamoDB.
///
/// Workloads making use of this mechanism **must** ensure:
///
/// * Conditional and non-conditional operations are not performed on the same paths
/// * Conditional operations are only performed via similarly configured clients
///
/// Additionally as the locking mechanism relies on timeouts to detect stale locks,
/// performance will be poor for systems that frequently delete and then create
/// objects at the same path, instead being optimised for systems that primarily create
/// files with paths never used before, or perform conditional updates to existing files
///
/// ## Commit Protocol
///
/// The DynamoDB schema is as follows:
///
/// * A string partition key named `"path"`
/// * A string sort key named `"etag"`
/// * A numeric [TTL] attribute named `"ttl"`
/// * A numeric attribute named `"generation"`
/// * A numeric attribute named `"timeout"`
///
/// An appropriate DynamoDB table can be created with the CLI as follows:
///
/// ```bash
/// $ aws dynamodb create-table --table-name <TABLE_NAME> --key-schema AttributeName=path,KeyType=HASH AttributeName=etag,KeyType=RANGE --attribute-definitions AttributeName=path,AttributeType=S AttributeName=etag,AttributeType=S
/// $ aws dynamodb update-time-to-live --table-name <TABLE_NAME> --time-to-live-specification Enabled=true,AttributeName=ttl
/// ```
///
/// To perform a conditional operation on an object with a given `path` and `etag` (`*` if creating),
/// the commit protocol is as follows:
///
/// 1. Perform HEAD request on `path` and error on precondition mismatch
/// 2. Create record in DynamoDB with given `path` and `etag` with the configured timeout
/// 1. On Success: Perform operation with the configured timeout
/// 2. On Conflict:
/// 1. Periodically re-perform HEAD request on `path` and error on precondition mismatch
/// 2. If `timeout * max_skew_rate` passed, replace the record incrementing the `"generation"`
/// 1. On Success: GOTO 2.1
/// 2. On Conflict: GOTO 2.2
///
/// Provided no writer modifies an object with a given `path` and `etag` without first adding a
/// corresponding record to DynamoDB, we are guaranteed that only one writer will ever commit.
///
/// This is inspired by the [DynamoDB Lock Client] but simplified for the more limited
/// requirements of synchronizing object storage. The major changes are:
///
/// * Uses a monotonic generation count instead of a UUID rvn, as this is:
/// * Cheaper to generate, serialize and compare
/// * Cannot collide
/// * More human readable / interpretable
/// * Relies on [TTL] to eventually clean up old locks
///
/// It also draws inspiration from the DeltaLake [S3 Multi-Cluster] commit protocol, but
/// generalised to not make assumptions about the workload and not rely on first writing
/// to a temporary path.
///
/// [TTL]: https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/howitworks-ttl.html
/// [DynamoDB Lock Client]: https://aws.amazon.com/blogs/database/building-distributed-locks-with-the-dynamodb-lock-client/
/// [S3 Multi-Cluster]: https://docs.google.com/document/d/1Gs4ZsTH19lMxth4BSdwlWjUNR-XhKHicDvBjd2RqNd8/edit#heading=h.mjjuxw9mcz9h
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct DynamoCommit {
table_name: String,
/// The number of milliseconds a lease is valid for
timeout: u64,
/// The maximum clock skew rate tolerated by the system
max_clock_skew_rate: u32,
/// The length of time a record will be retained in DynamoDB before being cleaned up
///
/// This is purely an optimisation to avoid indefinite growth of the DynamoDB table
/// and does not impact how long clients may wait to acquire a lock
ttl: Duration,
/// The backoff duration before retesting a condition
test_interval: Duration,
}
impl DynamoCommit {
/// Create a new [`DynamoCommit`] with a given table name
pub fn new(table_name: String) -> Self {
Self {
table_name,
timeout: 20_000,
max_clock_skew_rate: 3,
ttl: Duration::from_secs(60 * 60),
test_interval: Duration::from_millis(100),
}
}
/// Overrides the lock timeout.
///
/// A longer lock timeout reduces the probability of spurious commit failures and multi-writer
/// races, but will increase the time that writers must wait to reclaim a lock lost. The
/// default value of 20 seconds should be appropriate for must use-cases.
pub fn with_timeout(mut self, millis: u64) -> Self {
self.timeout = millis;
self
}
/// The maximum clock skew rate tolerated by the system.
///
/// An environment in which the clock on the fastest node ticks twice as fast as the slowest
/// node, would have a clock skew rate of 2. The default value of 3 should be appropriate
/// for most environments.
pub fn with_max_clock_skew_rate(mut self, rate: u32) -> Self {
self.max_clock_skew_rate = rate;
self
}
/// The length of time a record should be retained in DynamoDB before being cleaned up
///
/// This should be significantly larger than the configured lock timeout, with the default
/// value of 1 hour appropriate for most use-cases.
pub fn with_ttl(mut self, ttl: Duration) -> Self {
self.ttl = ttl;
self
}
/// Parse [`DynamoCommit`] from a string
pub(crate) fn from_str(value: &str) -> Option<Self> {
Some(match value.split_once(':') {
Some((table_name, timeout)) => {
Self::new(table_name.trim().to_string()).with_timeout(timeout.parse().ok()?)
}
None => Self::new(value.trim().to_string()),
})
}
/// Returns the name of the DynamoDB table.
pub(crate) fn table_name(&self) -> &str {
&self.table_name
}
pub(crate) async fn copy_if_not_exists(
&self,
client: &S3Client,
from: &Path,
to: &Path,
) -> Result<()> {
self.conditional_op(client, to, None, || async {
client.copy_request(from, to).send().await?;
Ok(())
})
.await
}
#[allow(clippy::future_not_send)] // Generics confound this lint
pub(crate) async fn conditional_op<F, Fut, T>(
&self,
client: &S3Client,
to: &Path,
etag: Option<&str>,
op: F,
) -> Result<T>
where
F: FnOnce() -> Fut,
Fut: Future<Output = Result<T, Error>>,
{
check_precondition(client, to, etag).await?;
let mut previous_lease = None;
loop {
let existing = previous_lease.as_ref();
match self.try_lock(client, to.as_ref(), etag, existing).await? {
TryLockResult::Ok(lease) => {
let expiry = lease.acquire + lease.timeout;
return match tokio::time::timeout_at(expiry.into(), op()).await {
Ok(Ok(v)) => Ok(v),
Ok(Err(e)) => Err(e),
Err(_) => Err(Error::Generic {
store: "DynamoDB",
source: format!(
"Failed to perform conditional operation in {} milliseconds",
self.timeout
)
.into(),
}),
};
}
TryLockResult::Conflict(conflict) => {
let mut interval = tokio::time::interval(self.test_interval);
let expiry = conflict.timeout * self.max_clock_skew_rate;
loop {
interval.tick().await;
check_precondition(client, to, etag).await?;
if conflict.acquire.elapsed() > expiry {
previous_lease = Some(conflict);
break;
}
}
}
}
}
}
/// Attempt to acquire a lock, reclaiming an existing lease if provided
async fn try_lock(
&self,
s3: &S3Client,
path: &str,
etag: Option<&str>,
existing: Option<&Lease>,
) -> Result<TryLockResult> {
let attributes;
let (next_gen, condition_expression, expression_attribute_values) = match existing {
None => (0_u64, "attribute_not_exists(#pk)", Map(&[])),
Some(existing) => {
attributes = [(":g", AttributeValue::Number(existing.generation))];
(
existing.generation.checked_add(1).unwrap(),
"attribute_exists(#pk) AND generation = :g",
Map(attributes.as_slice()),
)
}
};
let ttl = (Utc::now() + self.ttl).timestamp();
let items = [
("path", AttributeValue::from(path)),
("etag", AttributeValue::from(etag.unwrap_or("*"))),
("generation", AttributeValue::Number(next_gen)),
("timeout", AttributeValue::Number(self.timeout)),
("ttl", AttributeValue::Number(ttl as _)),
];
let names = [("#pk", "path")];
let req = PutItem {
table_name: &self.table_name,
condition_expression,
expression_attribute_values,
expression_attribute_names: Map(&names),
item: Map(&items),
return_values: None,
return_values_on_condition_check_failure: Some(ReturnValues::AllOld),
};
let credential = s3.config.get_credential().await?;
let acquire = Instant::now();
match self
.request(s3, credential.as_deref(), "DynamoDB_20120810.PutItem", req)
.await
{
Ok(_) => Ok(TryLockResult::Ok(Lease {
acquire,
generation: next_gen,
timeout: Duration::from_millis(self.timeout),
})),
Err(e) => match parse_error_response(&e) {
Some(e) if e.error.ends_with(CONFLICT) => match extract_lease(&e.item) {
Some(lease) => Ok(TryLockResult::Conflict(lease)),
None => Err(Error::Generic {
store: STORE,
source: "Failed to extract lease from conflict ReturnValuesOnConditionCheckFailure response".into()
}),
},
_ => Err(Error::Generic {
store: STORE,
source: Box::new(e),
}),
},
}
}
async fn request<R: Serialize + Send + Sync>(
&self,
s3: &S3Client,
cred: Option<&AwsCredential>,
target: &str,
req: R,
) -> Result<HttpResponse, RetryError> {
let region = &s3.config.region;
let authorizer = cred.map(|x| AwsAuthorizer::new(x, "dynamodb", region));
let builder = match &s3.config.endpoint {
Some(e) => s3.client.request(Method::POST, e),
None => {
let url = format!("https://dynamodb.{region}.amazonaws.com");
s3.client.request(Method::POST, url)
}
};
// TODO: Timeout
builder
.json(&req)
.header("X-Amz-Target", target)
.with_aws_sigv4(authorizer, None)
.send_retry(&s3.config.retry_config)
.await
}
}
#[derive(Debug)]
enum TryLockResult {
/// Successfully acquired a lease
Ok(Lease),
/// An existing lease was found
Conflict(Lease),
}
/// Validates that `path` has the given `etag` or doesn't exist if `None`
async fn check_precondition(client: &S3Client, path: &Path, etag: Option<&str>) -> Result<()> {
let options = GetOptions {
head: true,
..Default::default()
};
match etag {
Some(expected) => match client.get_opts(path, options).await {
Ok(r) => match r.meta.e_tag {
Some(actual) if expected == actual => Ok(()),
actual => Err(Error::Precondition {
path: path.to_string(),
source: format!("{} does not match {expected}", actual.unwrap_or_default())
.into(),
}),
},
Err(Error::NotFound { .. }) => Err(Error::Precondition {
path: path.to_string(),
source: format!("Object at location {path} not found").into(),
}),
Err(e) => Err(e),
},
None => match client.get_opts(path, options).await {
Ok(_) => Err(Error::AlreadyExists {
path: path.to_string(),
source: "Already Exists".to_string().into(),
}),
Err(Error::NotFound { .. }) => Ok(()),
Err(e) => Err(e),
},
}
}
/// Parses the error response if any
fn parse_error_response(e: &RetryError) -> Option<ErrorResponse<'_>> {
match e.inner() {
RequestError::Status {
status: StatusCode::BAD_REQUEST,
body: Some(b),
} => serde_json::from_str(b).ok(),
_ => None,
}
}
/// Extracts a lease from `item`, returning `None` on error
fn extract_lease(item: &HashMap<&str, AttributeValue<'_>>) -> Option<Lease> {
let generation = match item.get("generation") {
Some(AttributeValue::Number(generation)) => generation,
_ => return None,
};
let timeout = match item.get("timeout") {
Some(AttributeValue::Number(timeout)) => *timeout,
_ => return None,
};
Some(Lease {
acquire: Instant::now(),
generation: *generation,
timeout: Duration::from_millis(timeout),
})
}
/// A lock lease
#[derive(Debug, Clone)]
struct Lease {
acquire: Instant,
generation: u64,
timeout: Duration,
}
/// A DynamoDB [PutItem] payload
///
/// [PutItem]: https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_PutItem.html
#[derive(Serialize)]
#[serde(rename_all = "PascalCase")]
struct PutItem<'a> {
/// The table name
table_name: &'a str,
/// A condition that must be satisfied in order for a conditional PutItem operation to succeed.
condition_expression: &'a str,
/// One or more substitution tokens for attribute names in an expression
expression_attribute_names: Map<'a, &'a str, &'a str>,
/// One or more values that can be substituted in an expression
expression_attribute_values: Map<'a, &'a str, AttributeValue<'a>>,
/// A map of attribute name/value pairs, one for each attribute
item: Map<'a, &'a str, AttributeValue<'a>>,
/// Use ReturnValues if you want to get the item attributes as they appeared
/// before they were updated with the PutItem request.
#[serde(skip_serializing_if = "Option::is_none")]
return_values: Option<ReturnValues>,
/// An optional parameter that returns the item attributes for a PutItem operation
/// that failed a condition check.
#[serde(skip_serializing_if = "Option::is_none")]
return_values_on_condition_check_failure: Option<ReturnValues>,
}
#[derive(Deserialize)]
struct ErrorResponse<'a> {
#[serde(rename = "__type")]
error: &'a str,
#[serde(borrow, default, rename = "Item")]
item: HashMap<&'a str, AttributeValue<'a>>,
}
#[derive(Serialize)]
#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
enum ReturnValues {
AllOld,
}
/// A collection of key value pairs
///
/// This provides cheap, ordered serialization of maps
struct Map<'a, K, V>(&'a [(K, V)]);
impl<K: Serialize, V: Serialize> Serialize for Map<'_, K, V> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
if self.0.is_empty() {
return serializer.serialize_none();
}
let mut map = serializer.serialize_map(Some(self.0.len()))?;
for (k, v) in self.0 {
map.serialize_entry(k, v)?
}
map.end()
}
}
/// A DynamoDB [AttributeValue]
///
/// [AttributeValue]: https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_AttributeValue.html
#[derive(Debug, Serialize, Deserialize)]
enum AttributeValue<'a> {
#[serde(rename = "S")]
String(Cow<'a, str>),
#[serde(rename = "N", with = "number")]
Number(u64),
}
impl<'a> From<&'a str> for AttributeValue<'a> {
fn from(value: &'a str) -> Self {
Self::String(Cow::Borrowed(value))
}
}
/// Numbers are serialized as strings
mod number {
use serde::{Deserialize, Deserializer, Serializer};
pub(crate) fn serialize<S: Serializer>(v: &u64, s: S) -> Result<S::Ok, S::Error> {
s.serialize_str(&v.to_string())
}
pub(crate) fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<u64, D::Error> {
let v: &str = Deserialize::deserialize(d)?;
v.parse().map_err(serde::de::Error::custom)
}
}
use crate::client::HttpResponse;
/// Re-export integration_test to be called by s3_test
#[cfg(test)]
pub(crate) use tests::integration_test;
#[cfg(test)]
mod tests {
use super::*;
use crate::aws::AmazonS3;
use crate::ObjectStore;
use rand::distr::Alphanumeric;
use rand::{rng, Rng};
#[test]
fn test_attribute_serde() {
let serde = serde_json::to_string(&AttributeValue::Number(23)).unwrap();
assert_eq!(serde, "{\"N\":\"23\"}");
let back: AttributeValue<'_> = serde_json::from_str(&serde).unwrap();
assert!(matches!(back, AttributeValue::Number(23)));
}
/// An integration test for DynamoDB
///
/// This is a function called by s3_test to avoid test concurrency issues
pub(crate) async fn integration_test(integration: &AmazonS3, d: &DynamoCommit) {
let client = integration.client.as_ref();
let src = Path::from("dynamo_path_src");
integration.put(&src, "asd".into()).await.unwrap();
let dst = Path::from("dynamo_path");
let _ = integration.delete(&dst).await; // Delete if present
// Create a lock if not already exists
let existing = match d.try_lock(client, dst.as_ref(), None, None).await.unwrap() {
TryLockResult::Conflict(l) => l,
TryLockResult::Ok(l) => l,
};
// Should not be able to acquire a lock again
let r = d.try_lock(client, dst.as_ref(), None, None).await;
assert!(matches!(r, Ok(TryLockResult::Conflict(_))));
// But should still be able to reclaim lock and perform copy
d.copy_if_not_exists(client, &src, &dst).await.unwrap();
match d.try_lock(client, dst.as_ref(), None, None).await.unwrap() {
TryLockResult::Conflict(new) => {
// Should have incremented generation to do so
assert_eq!(new.generation, existing.generation + 1);
}
_ => panic!("Should conflict"),
}
let rng = rng();
let etag = String::from_utf8(rng.sample_iter(Alphanumeric).take(32).collect()).unwrap();
let t = Some(etag.as_str());
let l = match d.try_lock(client, dst.as_ref(), t, None).await.unwrap() {
TryLockResult::Ok(l) => l,
_ => panic!("should not conflict"),
};
match d.try_lock(client, dst.as_ref(), t, None).await.unwrap() {
TryLockResult::Conflict(c) => assert_eq!(l.generation, c.generation),
_ => panic!("should conflict"),
}
match d.try_lock(client, dst.as_ref(), t, Some(&l)).await.unwrap() {
TryLockResult::Ok(new) => assert_eq!(new.generation, l.generation + 1),
_ => panic!("should not conflict"),
}
}
}