akd_mysql/src/mysql.rs (1,357 lines of code) (raw):

// Copyright (c) Facebook, Inc. and its affiliates. // // This source code is licensed under both the MIT license found in the // LICENSE-MIT file in the root directory of this source tree and the Apache // License, Version 2.0 found in the LICENSE-APACHE file in the root directory // of this source tree. //! This module implements operations for a simple asynchronized mysql database use crate::mysql_storables::MySqlStorable; use akd::errors::StorageError; use akd::history_tree_node::HistoryTreeNode; use akd::node_state::HistoryNodeState; use akd::storage::types::{ AkdLabel, DbRecord, KeyData, StorageType, ValueState, ValueStateRetrievalFlag, }; use akd::storage::{Storable, Storage}; use akd::NodeLabel; use async_trait::async_trait; use log::{debug, error, info, warn}; use mysql_async::prelude::*; use mysql_async::*; use std::cmp::Ordering; use std::collections::{HashMap, HashSet}; use std::convert::TryInto; use std::process::Command; use std::sync::Arc; use tokio::time::{Duration, Instant}; type MySqlError = mysql_async::Error; type LocalTransaction = akd::storage::transaction::Transaction; use akd::storage::timed_cache::*; const TABLE_AZKS: &str = crate::mysql_storables::TABLE_AZKS; const TABLE_HISTORY_TREE_NODES: &str = crate::mysql_storables::TABLE_HISTORY_TREE_NODES; const TABLE_HISTORY_NODE_STATES: &str = crate::mysql_storables::TABLE_HISTORY_NODE_STATES; const TABLE_USER: &str = crate::mysql_storables::TABLE_USER; const TEMP_IDS_TABLE: &str = crate::mysql_storables::TEMP_IDS_TABLE; const MAXIMUM_SQL_TIER_CONNECTION_TIMEOUT_SECS: u64 = 300; const SQL_RECONNECTION_DELAY_SECS: u64 = 5; enum BatchMode { Full(mysql_async::Params), Partial(mysql_async::Params, usize), None, } // MySQL's max supported text size is 65535 // Of the prepared insert's below in this logic, // we have a max-string size of 267 + N(190). // Assuming 4b/char, we can work out an ABS // max multi-row write for the prepared statement as // (| 65535 | - | the constant parts |) / | the parts * depth | = ~ max depth of 343 // This is a conservative value of the estimate ^ // const MYSQL_EXTENDED_INSERT_DEPTH: usize = 1000; // note : migrated to Self::tunable_insert_depth /* MySql documentation: https://docs.rs/mysql_async/0.23.1/mysql_async/ */ /// Memory cache options for SQL query result caching pub enum MySqlCacheOptions { /// Do not utilize any cache None, /// Utilize the default caching settings Default, /// Customize the caching options (cache item duration) Specific(std::time::Duration), } /// Represents an _asynchronous_ connection to a MySQL database pub struct AsyncMySqlDatabase { opts: Opts, pool: Arc<tokio::sync::RwLock<Pool>>, is_healthy: Arc<tokio::sync::RwLock<bool>>, cache: Option<TimedCache>, trans: LocalTransaction, num_reads: Arc<tokio::sync::RwLock<u64>>, read_call_stats: Arc<tokio::sync::RwLock<HashMap<String, u64>>>, num_writes: Arc<tokio::sync::RwLock<u64>>, write_call_stats: Arc<tokio::sync::RwLock<HashMap<String, u64>>>, time_read: Arc<tokio::sync::RwLock<Duration>>, time_write: Arc<tokio::sync::RwLock<Duration>>, tunable_insert_depth: usize, } impl std::fmt::Display for AsyncMySqlDatabase { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let db_str = match self.opts.db_name() { Some(db) => format!("Database {}", db), None => String::from(""), }; let user_str = match self.opts.user() { Some(user) => format!(", User {}", user), None => String::from(""), }; write!( f, "Connected to {}:{} ({}{})", self.opts.ip_or_hostname(), self.opts.tcp_port(), db_str, user_str ) } } impl Clone for AsyncMySqlDatabase { fn clone(&self) -> Self { Self { opts: self.opts.clone(), pool: self.pool.clone(), is_healthy: self.is_healthy.clone(), cache: self.cache.clone(), trans: LocalTransaction::new(), num_reads: self.num_reads.clone(), read_call_stats: self.read_call_stats.clone(), num_writes: self.num_writes.clone(), write_call_stats: self.write_call_stats.clone(), time_read: self.time_read.clone(), time_write: self.time_write.clone(), tunable_insert_depth: self.tunable_insert_depth, } } } impl<'a> AsyncMySqlDatabase { /// Creates a new mysql database #[allow(unused)] pub async fn new<T: Into<String>>( endpoint: T, database: T, user: Option<T>, password: Option<T>, port: Option<u16>, cache_options: MySqlCacheOptions, depth: usize, ) -> Self { let dport = port.unwrap_or(3306u16); let mut builder = OptsBuilder::default() .ip_or_hostname(endpoint) .db_name(Option::from(database)) .user(user) .pass(password) .tcp_port(dport); let opts: Opts = builder.into(); #[allow(clippy::mutex_atomic)] let healthy = Arc::new(tokio::sync::RwLock::new(false)); // Exception to issue 139. This call SHOULD panic if we cannot create a connection pool // object to fail the entire app. It'll fail very early as we need to create the db // prior to the directory let pool = Self::new_connection_pool(&opts, &healthy).await.unwrap(); let cache = match cache_options { MySqlCacheOptions::None => None, MySqlCacheOptions::Default => Some(TimedCache::new(None)), MySqlCacheOptions::Specific(timing) => Some(TimedCache::new(Some(timing))), }; Self { opts, pool: Arc::new(tokio::sync::RwLock::new(pool)), is_healthy: healthy, cache, trans: LocalTransaction::new(), num_reads: Arc::new(tokio::sync::RwLock::new(0)), read_call_stats: Arc::new(tokio::sync::RwLock::new(HashMap::new())), num_writes: Arc::new(tokio::sync::RwLock::new(0)), write_call_stats: Arc::new(tokio::sync::RwLock::new(HashMap::new())), time_read: Arc::new(tokio::sync::RwLock::new(Duration::from_millis(0))), time_write: Arc::new(tokio::sync::RwLock::new(Duration::from_millis(0))), tunable_insert_depth: depth, } } /// Determine if the db connection is healthy at present pub async fn is_healthy(&self) -> bool { let is_healthy_guard = self.is_healthy.read().await; *is_healthy_guard } fn check_for_infra_error<T>( &self, result: core::result::Result<T, MySqlError>, ) -> core::result::Result<T, MySqlError> { match result { Err(err) => { let is_connection_infra_error: bool = match &err { // In mysql_async v0.28.1 TLS errors moved to IoError. Thus we cannot use them here. // TODO(eoz): Update error handling to take TLS errors into account. MySqlError::Other(_) | MySqlError::Url(_) /* | mysql_async::IoError::Tls(_) */ => false, MySqlError::Driver(_) | MySqlError::Io(_) | MySqlError::Server(_) => true, }; // If error is due to infra error (e.g bad connection) refresh // connection pool in background. This allows current request to // finish (with err) while blocking subsequent requests until a // healthy connection is restored. if is_connection_infra_error { let db = self.clone(); tokio::task::spawn(async move { if let Err(err) = db.refresh_connection_pool().await { error!("Error refreshing MySql connection pool: {:?}", err); } }); } Err::<T, MySqlError>(err) } Ok(t) => Ok(t), } } async fn get_connection(&self) -> Result<mysql_async::Conn> { let mut connection = { if self.is_healthy().await { let connection_pool_guard = self.pool.read().await; connection_pool_guard.get_conn().await? } else { // Connection pool is currently unhealthy and queries are // disallowed. Connection pool is being async refreshed in // background and will soon become healthy, so no action required // fail the connection return Err(MySqlError::Driver( mysql_async::DriverError::PoolDisconnected, )); } }; // Ensure we are running in TRADITIONAL mysql mode. TRADITIONAL mysql // converts many warnings to errors, for example it will reject too // large blob entries instead of truncating them with a warning. // This is essential for our system, since SEE relies on all data in our // XDB being exactly what it wrote. connection .query_drop("SET SESSION sql_mode = 'TRADITIONAL'") .await?; Ok(connection) } // Occasionally our connection pool will become stale. This happens // e.g on DB master promotions during mysql upgrades. In these scenarios our // queries will begin to fail, and we will need to call this method to // "refresh" the pool. async fn refresh_connection_pool(&self) -> core::result::Result<(), StorageError> { { let mut is_healthy_guard = self.is_healthy.write().await; if !*is_healthy_guard { info!("Already refreshing MySql connection pool!"); return Ok(()); } *is_healthy_guard = false; } warn!("Refreshing MySql connection pool."); debug!("BEGIN refresh mysql connection pool"); // Grab early write lock so no new queries can be initiated before // connection pool is refreshed. let mut connection_pool_guard = self.pool.write().await; let pool = Self::new_connection_pool(&self.opts, &self.is_healthy).await?; *connection_pool_guard = pool; debug!("END refresh mysql connection pool"); Ok(()) } async fn new_connection_pool( opts: &mysql_async::Opts, is_healthy: &Arc<tokio::sync::RwLock<bool>>, ) -> core::result::Result<mysql_async::Pool, StorageError> { let start = Instant::now(); let mut attempts = 1; loop { let ip = opts.ip_or_hostname(); let pool_options = opts.clone(); let pool = Pool::new(pool_options); let conn = pool.get_conn().await; if let Ok(_conn) = conn { if let Ok(()) = Self::setup_database(_conn).await { // set the healthy flag to true let mut is_healthy_guard = is_healthy.write().await; *is_healthy_guard = true; return Ok(pool); } } if start.elapsed().as_secs() > MAXIMUM_SQL_TIER_CONNECTION_TIMEOUT_SECS { let message = format!( "Unable to get a SQL connection to {} after {} attempts in {} seconds", ip, attempts, start.elapsed().as_secs() ); error!("{}", message); return Err(StorageError::Connection(message)); } warn!( "Failed {:?} reconnection attempt(s) to MySQL database. Will retry in {} seconds", attempts, SQL_RECONNECTION_DELAY_SECS ); tokio::time::sleep(tokio::time::Duration::from_secs( // TOKIO 0.2.X //tokio::time::sleep(tokio::time::Duration::from_secs( // TOKIO 1.X SQL_RECONNECTION_DELAY_SECS, )) .await; attempts += 1 } } async fn setup_database(mut conn: mysql_async::Conn) -> core::result::Result<(), MySqlError> { let mut tx: mysql_async::Transaction<'_> = conn.start_transaction(TxOpts::default()).await?; // AZKS table let command = "CREATE TABLE IF NOT EXISTS `".to_owned() + TABLE_AZKS + "` (`key` SMALLINT UNSIGNED NOT NULL, `epoch` BIGINT UNSIGNED NOT NULL," + " `num_nodes` BIGINT UNSIGNED NOT NULL, PRIMARY KEY (`key`))"; tx.query_drop(command).await?; // History tree nodes table let command = "CREATE TABLE IF NOT EXISTS `".to_owned() + TABLE_HISTORY_TREE_NODES + "` (`label_len` INT UNSIGNED NOT NULL, `label_val` VARBINARY(32) NOT NULL," + " `birth_epoch` BIGINT UNSIGNED NOT NULL," + " `last_epoch` BIGINT UNSIGNED NOT NULL, `parent_label_len` INT UNSIGNED NOT NULL," + " `parent_label_val` VARBINARY(32) NOT NULL, `node_type` SMALLINT UNSIGNED NOT NULL," + " PRIMARY KEY (`label_len`, `label_val`))"; tx.query_drop(command).await?; // History node states table let command = "CREATE TABLE IF NOT EXISTS `".to_owned() + TABLE_HISTORY_NODE_STATES + "` (`label_len` INT UNSIGNED NOT NULL, `label_val` VARBINARY(32) NOT NULL, " + " `epoch` BIGINT UNSIGNED NOT NULL, `value` VARBINARY(32), `child_states` VARBINARY(2000)," + " PRIMARY KEY (`label_len`, `label_val`, `epoch`))"; tx.query_drop(command).await?; // User data table let command = "CREATE TABLE IF NOT EXISTS `".to_owned() + TABLE_USER + "` (`username` VARCHAR(256) NOT NULL, `epoch` BIGINT UNSIGNED NOT NULL, `version` BIGINT UNSIGNED NOT NULL," + " `node_label_val` VARBINARY(32) NOT NULL, `node_label_len` INT UNSIGNED NOT NULL, `data` VARCHAR(2000)," + " PRIMARY KEY(`username`, `epoch`))"; tx.query_drop(command).await?; // if we got here, we're good to commit. Transaction's will auto-rollback when memory freed if commit wasn't done. tx.commit().await?; Ok(()) } /// Delete all the data in the tables pub async fn delete_data(&self) -> core::result::Result<(), MySqlError> { let mut conn = self.get_connection().await?; let mut tx = conn.start_transaction(TxOpts::default()).await?; let command = "DELETE FROM `".to_owned() + TABLE_AZKS + "`"; tx.query_drop(command).await?; let command = "DELETE FROM `".to_owned() + TABLE_USER + "`"; tx.query_drop(command).await?; let command = "DELETE FROM `".to_owned() + TABLE_HISTORY_NODE_STATES + "`"; tx.query_drop(command).await?; let command = "DELETE FROM `".to_owned() + TABLE_HISTORY_TREE_NODES + "`"; tx.query_drop(command).await?; tx.commit().await?; Ok(()) } /// Drop all the tables pub async fn drop_tables(&self) -> core::result::Result<(), MySqlError> { let mut conn = self.get_connection().await?; let mut tx = conn.start_transaction(TxOpts::default()).await?; let command = "DROP TABLE IF EXISTS `".to_owned() + TABLE_AZKS + "`"; tx.query_drop(command).await?; let command = "DROP TABLE IF EXISTS `".to_owned() + TABLE_USER + "`"; tx.query_drop(command).await?; let command = "DROP TABLE IF EXISTS `".to_owned() + TABLE_HISTORY_NODE_STATES + "`"; tx.query_drop(command).await?; let command = "DROP TABLE IF EXISTS `".to_owned() + TABLE_HISTORY_TREE_NODES + "`"; tx.query_drop(command).await?; tx.commit().await?; Ok(()) } /// Storage a record in the data layer async fn internal_set( &self, record: DbRecord, trans: Option<mysql_async::Transaction<'a>>, ) -> Result<()> { *(self.num_writes.write().await) += 1; self.record_call_stats('w', "internal_set".to_string(), "".to_string()) .await; debug!("BEGIN MySQL set"); let tic = Instant::now(); let statement_text = record.set_statement(); let params = record .set_params() .ok_or_else(|| Error::Other("Failed to construct MySQL parameters block".into()))?; let out = match trans { Some(mut tx) => match tx.exec_drop(statement_text, params).await { Err(err) => Err(err), Ok(next_tx) => Ok(next_tx), }, None => { let mut conn = self.get_connection().await?; if let Err(err) = conn.exec_drop(statement_text, params).await { Err(err) } else { Ok(()) } } }; let result = self.check_for_infra_error(out)?; let toc = Instant::now() - tic; *(self.time_write.write().await) += toc; debug!("END MySQL set"); Ok(result) } /// NOTE: This is assuming all of the DB records have been narrowed down to a single record type! async fn internal_batch_set( &self, records: Vec<DbRecord>, mut trans: mysql_async::Transaction<'a>, ) -> core::result::Result<mysql_async::Transaction<'a>, MySqlError> { if records.is_empty() { return Ok(trans); } *(self.num_writes.write().await) += records.len() as u64; self.record_call_stats('w', "internal_batch_set".to_string(), "".to_string()) .await; debug!("BEGIN Computing mysql parameters"); #[allow(clippy::needless_collect)] let chunked = records .chunks(self.tunable_insert_depth) .map(|batch| { if batch.is_empty() { Ok(BatchMode::None) } else if batch.len() < self.tunable_insert_depth { DbRecord::set_batch_params(batch) .map(|out| BatchMode::Partial(out, batch.len())) } else { DbRecord::set_batch_params(batch).map(BatchMode::Full) } }) .collect::<Result<Vec<_>>>()?; debug!("END Computing mysql parameters"); debug!("BEGIN MySQL set batch"); let head = &records[0]; let statement = |i: usize| -> String { match &head { DbRecord::Azks(_) => DbRecord::set_batch_statement::<akd::Azks>(i), DbRecord::HistoryNodeState(_) => { DbRecord::set_batch_statement::<HistoryNodeState>(i) } DbRecord::HistoryTreeNode(_) => DbRecord::set_batch_statement::<HistoryTreeNode>(i), DbRecord::ValueState(_) => { DbRecord::set_batch_statement::<akd::storage::types::ValueState>(i) } } }; let mut params = vec![]; let mut fallout: Option<(mysql_async::Params, usize)> = None; for item in chunked { match item { BatchMode::Full(part) => params.push(part), BatchMode::Partial(part, count) => fallout = Some((part, count)), _ => {} } } let tic = Instant::now(); debug!("MySQL batch - {} full inserts", params.len()); // insert the batches of size = MYSQL_EXTENDED_INSERT_DEPTH if !params.is_empty() { let fill_statement = statement(self.tunable_insert_depth); let out = trans.exec_batch(fill_statement, params).await; self.check_for_infra_error(out)?; } // insert the remainder as a final statement if let Some((remainder, count)) = fallout { debug!("MySQL batch - remainder {} insert", count); let remainder_stmt = statement(count); let out = trans.exec_drop(remainder_stmt, remainder).await; self.check_for_infra_error(out)?; } let toc = Instant::now() - tic; *(self.time_write.write().await) += toc; debug!("END MySQL set batch"); Ok(trans) } /// Create the test database #[allow(dead_code)] pub async fn create_test_db<T: Into<String>>( endpoint: T, user: Option<T>, password: Option<T>, port: Option<u16>, ) -> core::result::Result<(), MySqlError> { let dport = port.unwrap_or(3306u16); let builder = OptsBuilder::default() .ip_or_hostname(endpoint) .user(user) .pass(password) .tcp_port(dport); let opts: Opts = Opts::from(builder); let mut conn = Conn::new(opts).await?; conn.query_drop(r"CREATE DATABASE IF NOT EXISTS test_db") .await?; Ok(()) } async fn record_call_stats(&self, call_type: char, caller_name: String, data_type: String) { let mut stats; if call_type == 'r' { stats = self.read_call_stats.write().await; } else if call_type == 'w' { stats = self.write_call_stats.write().await; } else { panic!("Unknown call type to record call stats for.") } let call_count = (*stats).entry(caller_name + "~" + &data_type).or_insert(0); *call_count += 1; } fn try_dockers() -> std::io::Result<std::process::Output> { let potential_docker_paths = vec![ "/usr/local/bin/docker", "/usr/bin/docker", "/sbin/docker", "/bin/docker", "docker", ]; let mut output = Err(std::io::Error::from_raw_os_error(2)); for path in potential_docker_paths { output = Command::new(path) // Name filter lists containers containing the name. See https://docs.docker.com/engine/reference/commandline/ps/. // Therefore, a container with a name like akd-test-dbc would match but would be wrong. // This regex ensures exact match. .args(["container", "ls", "-f", "name=^/akd-test-db$"]) .output(); match &output { Ok(result) => { if let (Ok(out), Ok(err)) = ( std::str::from_utf8(&result.stdout), std::str::from_utf8(&result.stderr), ) { info!("Docker ls output\nSTDOUT: {}\nSTDERR: {}", out, err); } break; } Err(err) => { warn!("Docker ls returned error \"{:?}\"\nTrying next possible docker command location", err); } } } output } /// Determine if the MySQL environment is available for execution (i.e. docker container is running) #[allow(dead_code)] pub fn test_guard() -> bool { let output = Self::try_dockers(); // docker threw some kind of error running, assume down if let Ok(result) = output { // the result will look like // // CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES // 4bd11d9e28f2 ecac195d15af "docker-entrypoint.s…" 4 minutes ago Up 4 minutes 33060/tcp, 0.0.0.0:8001->3306/tcp, :::8001->3306/tcp seemless-test-db // // so there should be 2 output lines assuming all is successful and the container is running. const NUM_LINES_EXPECTED: usize = 2; let err = std::str::from_utf8(&result.stderr); if let Ok(error_message) = err { if !error_message.is_empty() { error!("Error executing docker command: {}", error_message); } } // Note that lines().count() returns the same number for lines with and without a final line ending. let is_container_listed = std::str::from_utf8(&result.stdout) .map(|str| str.lines().count() == NUM_LINES_EXPECTED); return is_container_listed.unwrap_or(false); } // docker may have thrown an error, just fail false } } #[async_trait] impl Storage for AsyncMySqlDatabase { async fn log_metrics(&self, level: log::Level) { if let Some(cache) = &self.cache { cache.log_metrics(level).await } self.trans.log_metrics(level).await; let mut tree_size = "Tree size: Query err".to_string(); let mut node_state_size = "Node state count: Query err".to_string(); let mut value_state_size = "Value state count: Query err".to_string(); if let Ok(mut conn) = self.get_connection().await { let query_text = format!("SELECT COUNT(*) FROM {}", TABLE_HISTORY_TREE_NODES); if let Ok(results) = conn.query_iter(query_text).await { if let Ok(mapped) = results .map_and_drop(|row| { let count: u64 = mysql_async::from_row(row); count }) .await { if let Some(count) = mapped.first() { tree_size = format!("Tree size: {}", count); } let query_text = format!("SELECT COUNT(`epoch`) FROM {}", TABLE_HISTORY_NODE_STATES); if let Ok(results) = conn.query_iter(query_text).await { if let Ok(mapped) = results .map_and_drop(|row| { let count: u64 = mysql_async::from_row(row); count }) .await { if let Some(count) = mapped.first() { node_state_size = format!("Node state count: {}", count); } let query_text = format!("SELECT COUNT(`epoch`) FROM {}", TABLE_USER); if let Ok(results) = conn.query_iter(query_text).await { if let Ok(mapped) = results .map_and_drop(|row| { let count: u64 = mysql_async::from_row(row); count }) .await { if let Some(count) = mapped.first() { value_state_size = format!("Value state count: {}", count); } } } } } } } } let mut r = self.num_reads.write().await; let mut rcs = self.read_call_stats.write().await; let mut w = self.num_writes.write().await; let mut wcs = self.write_call_stats.write().await; let mut tr = self.time_read.write().await; let mut tw = self.time_write.write().await; // Sort call stats for consistency. let mut rcs_vec = (*rcs).iter().collect::<Vec<_>>(); let mut wcs_vec = (*wcs).iter().collect::<Vec<_>>(); rcs_vec.sort_by_key(|rc| rc.0); wcs_vec.sort_by_key(|wc| wc.0); let msg = format!( "MySQL writes: {}, MySQL reads: {}, Time read: {} s, Time write: {} s\n\t{}\n\t{}\n\t{}\nRead call stats: {:?}\nWrite call stats: {:?}\n", *w, *r, (*tr).as_secs_f64(), (*tw).as_secs_f64(), tree_size, node_state_size, value_state_size, rcs_vec, wcs_vec, ); *r = 0; *rcs = HashMap::new(); *w = 0; *wcs = HashMap::new(); *tr = Duration::from_millis(0); *tw = Duration::from_millis(0); match level { // Currently logs cannot be captured unless they are // println!. Normally Level::Trace should use the trace! macro. log::Level::Trace => println!("{}", msg), log::Level::Debug => debug!("{}", msg), log::Level::Info => info!("{}", msg), log::Level::Warn => warn!("{}", msg), _ => error!("{}", msg), } } /// Start a transaction in the storage layer async fn begin_transaction(&self) -> bool { // disable the cache cleaning since we're in a write transaction // and will want to keep cache'd objects for the life of the transaction if let Some(cache) = &self.cache { cache.disable_clean().await; } self.trans.begin_transaction().await } /// Commit a transaction in the storage layer async fn commit_transaction(&self) -> core::result::Result<(), StorageError> { // The transaction is now complete (or reverted) and therefore we can re-enable // the cache cleaning status if let Some(cache) = &self.cache { cache.enable_clean().await; } // this retrieves all the trans operations, and "de-activates" the transaction flag let ops = self.trans.commit_transaction().await?; self.batch_set(ops).await } /// Rollback a transaction async fn rollback_transaction(&self) -> core::result::Result<(), StorageError> { // The transaction is being reverted and therefore we can re-enable // the cache cleaning status if let Some(cache) = &self.cache { cache.enable_clean().await; } self.trans.rollback_transaction().await } /// Retrieve a flag determining if there is a transaction active async fn is_transaction_active(&self) -> bool { self.trans.is_transaction_active().await } /// Storage a record in the data layer async fn set(&self, record: DbRecord) -> core::result::Result<(), StorageError> { // we're in a transaction, set the item in the transaction if self.is_transaction_active().await { self.trans.set(&record).await; return Ok(()); } if let Some(cache) = &self.cache { cache.put(&record).await; } match self.internal_set(record, None).await { Ok(_) => Ok(()), Err(error) => { error!("MySQL error {}", error); Err(StorageError::Other(format!("MySQL Error {}", error))) } } } async fn batch_set(&self, records: Vec<DbRecord>) -> core::result::Result<(), StorageError> { if records.is_empty() { // nothing to do, save the cycles return Ok(()); } // we're in a transaction, set the items in the transaction if self.is_transaction_active().await { for record in records.into_iter() { self.trans.set(&record).await; } return Ok(()); } if let Some(cache) = &self.cache { let _ = cache.batch_put(&records).await; } // generate batches by type let mut groups = std::collections::HashMap::new(); for record in records { match &record { DbRecord::Azks(_) => groups .entry(StorageType::Azks) .or_insert_with(Vec::new) .push(record), DbRecord::HistoryNodeState(_) => groups .entry(StorageType::HistoryNodeState) .or_insert_with(Vec::new) .push(record), DbRecord::HistoryTreeNode(_) => groups .entry(StorageType::HistoryTreeNode) .or_insert_with(Vec::new) .push(record), DbRecord::ValueState(_) => groups .entry(StorageType::ValueState) .or_insert_with(Vec::new) .push(record), } } // now execute each type'd batch in batch operations let result = async { let mut conn = self.get_connection().await?; let mut tx = conn.start_transaction(TxOpts::default()).await?; // go through each group which is narrowed to a single type // applying the changes on the transaction tx.query_drop("SET autocommit=0").await?; tx.query_drop("SET unique_checks=0").await?; tx.query_drop("SET foreign_key_checks=0").await?; for (_key, mut value) in groups.into_iter() { if !value.is_empty() { // Sort the records to match db-layer sorting which will help with insert performance value.sort_by(|a, b| match &a { DbRecord::HistoryNodeState(state) => { if let DbRecord::HistoryNodeState(state2) = &b { state.key.cmp(&state2.key) } else { Ordering::Equal } } DbRecord::HistoryTreeNode(node) => { if let DbRecord::HistoryTreeNode(node2) = &b { node.label.cmp(&node2.label) } else { Ordering::Equal } } DbRecord::ValueState(state) => { if let DbRecord::ValueState(state2) = &b { match state.username.0.cmp(&state2.username.0) { Ordering::Equal => state.epoch.cmp(&state2.epoch), other => other, } } else { Ordering::Equal } } _ => Ordering::Equal, }); // execute the multi-batch insert statement(s) tx = self.internal_batch_set(value, tx).await?; } } tx.query_drop("SET autocommit=1").await?; tx.query_drop("SET unique_checks=1").await?; tx.query_drop("SET foreign_key_checks=1").await?; tx.commit().await?; Ok::<(), MySqlError>(()) }; match result.await { Ok(_) => Ok(()), Err(error) => { error!("MySQL error {}", error); Err(StorageError::Other(format!("MySQL Error {}", error))) } } } /// Retrieve a stored record from the data layer async fn get<St: Storable>( &self, id: &St::Key, ) -> core::result::Result<DbRecord, StorageError> { // we're in a transaction, meaning the object _might_ be newer and therefore we should try and read if from the transaction // log instead of the raw storage layer if self.is_transaction_active().await { if let Some(result) = self.trans.get::<St>(id).await { return Ok(result); } } // check for a cache hit if let Some(cache) = &self.cache { if let Some(result) = cache.hit_test::<St>(id).await { return Ok(result); } } // cache miss, log a real sql read op let record = self.get_direct::<St>(id).await?; if let Some(cache) = &self.cache { // cache the result cache.put(&record).await; } Ok(record) } async fn get_direct<St: Storable>( &self, id: &St::Key, ) -> core::result::Result<DbRecord, StorageError> { *(self.num_reads.write().await) += 1; self.record_call_stats( 'r', "get_direct:".to_string(), format!("{:?}", St::data_type()), ) .await; debug!("BEGIN MySQL get {:?}", id); let result = async { let tic = Instant::now(); let mut conn = self.get_connection().await?; let statement = DbRecord::get_specific_statement::<St>(); let params = DbRecord::get_specific_params::<St>(id); let out = match params { Some(p) => match conn.exec_first(statement, p).await { Err(err) => Err(err), Ok(result) => Ok(result), }, None => match conn.query_first(statement).await { Err(err) => Err(err), Ok(result) => Ok(result), }, }; let toc = Instant::now() - tic; *(self.time_read.write().await) += toc; let result = self.check_for_infra_error(out)?; if let Some(mut row) = result { // return result let record = DbRecord::from_row::<St>(&mut row)?; return Ok::<Option<DbRecord>, MySqlError>(Some(record)); } Ok::<Option<DbRecord>, MySqlError>(None) }; debug!("END MySQL get"); match result.await { Ok(Some(r)) => Ok(r), Ok(None) => Err(StorageError::NotFound(format!( "{:?} {:?}", St::data_type(), id ))), Err(error) => { error!("MySQL error {}", error); Err(StorageError::Other(format!("MySQL Error {}", error))) } } } /// Flush the caching of objects (if present) async fn flush_cache(&self) { if let Some(cache) = &self.cache { cache.flush().await; } } /// Retrieve a batch of records by id async fn batch_get<St: Storable>( &self, ids: &[St::Key], ) -> core::result::Result<Vec<DbRecord>, StorageError> { let mut map = Vec::new(); if ids.is_empty() { // nothing to retrieve, save the cycles return Ok(map); } let mut key_set: HashSet<St::Key> = ids.iter().cloned().collect::<HashSet<_>>(); let trans_active = self.is_transaction_active().await; // first check the transaction log & cache records for id in ids.iter() { if trans_active { // we're in a transaction, meaning the object _might_ be newer and therefore we should try and read if from the transaction // log instead of the raw storage layer if let Some(result) = self.trans.get::<St>(id).await { map.push(result); key_set.remove(id); continue; } } // check if item is cached if let Some(cache) = &self.cache { if let Some(result) = cache.hit_test::<St>(id).await { map.push(result); key_set.remove(id); continue; } } } if !key_set.is_empty() { // these are items to be retrieved from the backing database (not in pending transaction or in the object cache) let result = async { let tic = Instant::now(); let key_set_vec: Vec<_> = key_set.into_iter().collect(); debug!("BEGIN MySQL get batch"); let mut conn = self.get_connection().await?; let results = if let Some(create_table_cmd) = DbRecord::get_batch_create_temp_table::<St>() { // Create the temp table of ids let out = conn.query_drop(create_table_cmd).await; self.check_for_infra_error(out)?; // Fill temp table with the requested ids let mut tx = conn.start_transaction(TxOpts::default()).await?; tx.query_drop("SET autocommit=0").await?; tx.query_drop("SET unique_checks=0").await?; tx.query_drop("SET foreign_key_checks=0").await?; let mut fallout: Option<Vec<_>> = None; let mut params = vec![]; for batch in key_set_vec.chunks(self.tunable_insert_depth) { if batch.len() < self.tunable_insert_depth { fallout = Some(batch.to_vec()); } else if let Some(p) = DbRecord::get_multi_row_specific_params::<St>(batch) { params.push(p); } else { return Err(MySqlError::Other( "Unable to generate type-specific MySQL parameters".into(), )); } } // insert the batches of size = MYSQL_EXTENDED_INSERT_DEPTH if !params.is_empty() { let fill_statement = DbRecord::get_batch_fill_temp_table::<St>(Some( self.tunable_insert_depth, )); let out = tx.exec_batch(fill_statement, params).await; self.check_for_infra_error(out)?; // We would need the statement for it. (Possibly) No need for close here. // See https://docs.rs/mysql_async/0.28.1/mysql_async/struct.Opts.html#caveats. // tx.close().await?; } // insert the remainder as a final statement if let Some(remainder) = fallout { let remainder_stmt = DbRecord::get_batch_fill_temp_table::<St>(Some(remainder.len())); let params_batch = DbRecord::get_multi_row_specific_params::<St>(&remainder); if let Some(pb) = params_batch { let out = tx.exec_drop(remainder_stmt, pb).await; self.check_for_infra_error(out)?; } else { return Err(MySqlError::Other( "Unable to generate type-specific MySQL parameters".into(), )); } } tx.query_drop("SET autocommit=1").await?; tx.query_drop("SET unique_checks=1").await?; tx.query_drop("SET foreign_key_checks=1").await?; tx.commit().await?; // Query the records which intersect (INNER JOIN) with the temp table of ids let query = DbRecord::get_batch_statement::<St>(); let out = conn.query_iter(query).await; let result = self.check_for_infra_error(out)?; let out = result .reduce_and_drop(vec![], |mut acc, mut row| { if let Ok(result) = DbRecord::from_row::<St>(&mut row) { acc.push(result); } acc }) .await?; // drop the temp table of ids let t_out = conn .query_drop(format!("DROP TEMPORARY TABLE `{}`", TEMP_IDS_TABLE)) .await; self.check_for_infra_error(t_out)?; out } else { // no results (i.e. AZKS table doesn't support "get by batch ids") vec![] }; debug!("END MySQL get batch"); let toc = Instant::now() - tic; *(self.time_read.write().await) += toc; if let Some(cache) = &self.cache { // insert retrieved records into the cache for faster future access for el in results.iter() { cache.put(el).await; } } Ok::<Vec<DbRecord>, mysql_async::Error>(results) }; *(self.num_reads.write().await) += 1; self.record_call_stats( 'r', "batch_get".to_string(), format!("{:?}", St::data_type()), ) .await; match result.await { Ok(result_vec) => { for item in result_vec.into_iter() { map.push(item); } } Err(error) => { error!("MySQL error {}", error); return Err(StorageError::Other(format!("MySQL Error {}", error))); } } } Ok(map) } async fn tombstone_value_states( &self, keys: &[akd::storage::types::ValueStateKey], ) -> core::result::Result<(), StorageError> { // NOTE: This might be optimizable in the future where we could use a SQL statement such as // // UPDATE `users` // SET `data` = TOMBSTONE // WHERE key in (set) // // However, the problem comes from managing an active transaction and cache (if there is one) // since we may need to batch load nodes anyways in order to get the other properties // which might need to be set. We could write everything to SQL, and after-the-fact update // the active transaction and caches with replacing nodes which were updated? Anyways it's a // relatively minor improvement here, due to proper use of batch operations if keys.is_empty() { return Ok(()); } let data = self.batch_get::<ValueState>(keys).await?; let mut new_data = vec![]; for record in data { if let DbRecord::ValueState(value_state) = record { new_data.push(DbRecord::ValueState(ValueState { epoch: value_state.epoch, label: value_state.label, plaintext_val: akd::AkdValue(akd::TOMBSTONE.to_vec()), username: value_state.username, version: value_state.version, })); } } if !new_data.is_empty() { debug!("Tombstoning {} entries", new_data.len()); self.batch_set(new_data).await?; } Ok(()) } async fn get_user_data( &self, username: &AkdLabel, ) -> core::result::Result<KeyData, StorageError> { // This is the same as previous logic under "get_all" *(self.num_reads.write().await) += 1; self.record_call_stats('r', "get_user_data".to_string(), "".to_string()) .await; // DO NOT log the user info, it's PII in the future debug!("BEGIN MySQL get user data"); let result = async { let tic = Instant::now(); let mut conn = self.get_connection().await?; let statement_text = "SELECT `username`, `epoch`, `version`, `node_label_val`, `node_label_len`, `data` FROM `" .to_owned() + TABLE_USER + "` WHERE `username` = :the_user"; let mut result = conn .exec_iter(statement_text, params! { "the_user" => username.0.clone() }) .await?; let out = result .map(|mut row| { if let ( Some(username), Some(epoch), Some(version), Some(node_label_val), Some(node_label_len), Some(data), ) = ( row.take(0), row.take(1), row.take(2), row.take::<Vec<u8>, _>(3), row.take(4), row.take(5), ) { // explicitly check the array length for safety if node_label_val.len() == 32 { let val: [u8; 32] = node_label_val.try_into().unwrap(); return Some(ValueState { epoch, version, label: NodeLabel { val, len: node_label_len, }, plaintext_val: akd::storage::types::AkdValue(data), username: akd::storage::types::AkdLabel(username), }); } } None }) .await .map(|a| a.into_iter().flatten().collect::<Vec<_>>()); let toc = Instant::now() - tic; *(self.time_read.write().await) += toc; let selected_records = self.check_for_infra_error(out)?; if let Some(cache) = &self.cache { for record in selected_records.iter() { cache.put(&DbRecord::ValueState(record.clone())).await; } } if self.is_transaction_active().await { let mut updated = vec![]; for record in selected_records.into_iter() { if let Some(DbRecord::ValueState(value)) = self .trans .get::<akd::storage::types::ValueState>(&record.get_id()) .await { updated.push(value); } else { updated.push(record); } } Ok::<KeyData, MySqlError>(KeyData { states: updated }) } else { Ok::<KeyData, MySqlError>(KeyData { states: selected_records, }) } }; debug!("END MySQL get user data"); match result.await { Ok(output) => Ok(output), Err(error) => { error!("MySQL error {}", error); Err(StorageError::Other(format!("MySQL Error {}", error))) } } } async fn get_user_state( &self, username: &AkdLabel, flag: ValueStateRetrievalFlag, ) -> core::result::Result<ValueState, StorageError> { *(self.num_reads.write().await) += 1; self.record_call_stats('r', "get_user_state".to_string(), "".to_string()) .await; debug!("BEGIN MySQL get user state (flag {:?})", flag); let result = async { let tic = Instant::now(); let mut conn = self.get_connection().await?; let mut statement_text = "SELECT `username`, `epoch`, `version`, `node_label_val`, `node_label_len`, `data` FROM `" .to_owned() + TABLE_USER + "` WHERE `username` = :the_user"; let mut params_map = vec![("the_user", Value::from(&username.0))]; // apply the specific filter match flag { ValueStateRetrievalFlag::SpecificVersion(version) => { params_map.push(("the_version", Value::from(version))); statement_text += " AND `version` = :the_version"; } ValueStateRetrievalFlag::SpecificEpoch(epoch) => { params_map.push(("the_epoch", Value::from(epoch))); statement_text += " AND `epoch` = :the_epoch"; } ValueStateRetrievalFlag::MaxEpoch => statement_text += " ORDER BY `epoch` DESC", ValueStateRetrievalFlag::MinEpoch => statement_text += " ORDER BY `epoch` ASC", ValueStateRetrievalFlag::LeqEpoch(epoch) => { params_map.push(("the_epoch", Value::from(epoch))); statement_text += " AND `epoch` <= :the_epoch ORDER BY `epoch` DESC"; } } // add limit to retrieve only 1 record statement_text += " LIMIT 1"; let out = conn .exec_iter(statement_text, mysql_async::Params::from(params_map)) .await? .map(|mut row| { if let ( Some(username), Some(epoch), Some(version), Some(node_label_val), Some(node_label_len), Some(data), ) = ( row.take(0), row.take(1), row.take(2), row.take::<Vec<_>, _>(3), row.take(4), row.take(5), ) { // explicitly check the array length for safety if node_label_val.len() == 32 { let val: [u8; 32] = node_label_val.try_into().unwrap(); return Some(ValueState { epoch, version, label: NodeLabel { val, len: node_label_len, }, plaintext_val: akd::storage::types::AkdValue(data), username: akd::storage::types::AkdLabel(username), }); } } None }) .await .map(|a| a.into_iter().flatten().collect::<Vec<_>>()); let toc = Instant::now() - tic; *(self.time_read.write().await) += toc; let selected_record = self.check_for_infra_error(out)?; let item = selected_record.into_iter().next(); if let Some(value_in_item) = &item { if let Some(cache) = &self.cache { cache .put(&DbRecord::ValueState(value_in_item.clone())) .await; } } // check the transaction log for an updated record if self.is_transaction_active().await { if let Some(found_item) = &item { if let Some(DbRecord::ValueState(value)) = self .trans .get::<akd::storage::types::ValueState>(&found_item.get_id()) .await { return Ok::<Option<ValueState>, MySqlError>(Some(value)); } } } Ok::<Option<ValueState>, MySqlError>(item) }; debug!("END MySQL get user state"); match result.await { Ok(Some(result)) => Ok(result), Ok(None) => Err(StorageError::NotFound(format!("ValueState {:?}", username))), Err(error) => { error!("MySQL error {}", error); Err(StorageError::Other(format!("MySQL Error {}", error))) } } } async fn get_user_state_versions( &self, keys: &[AkdLabel], flag: ValueStateRetrievalFlag, ) -> core::result::Result<HashMap<AkdLabel, u64>, StorageError> { *(self.num_reads.write().await) += 1; self.record_call_stats('r', "get_user_state_versions".to_string(), "".to_string()) .await; let mut results = HashMap::new(); debug!("BEGIN MySQL get user state versions (flag {:?})", flag); let result = async { let tic = Instant::now(); let mut conn = self.get_connection().await?; debug!("Creating the temporary search username's table"); let out = conn .query_drop( "CREATE TEMPORARY TABLE `search_users`(`username` VARCHAR(256) NOT NULL, PRIMARY KEY (`username`))", ) .await; self.check_for_infra_error(out)?; debug!( "Inserting the query users into the temporary table in batches of {}", self.tunable_insert_depth ); let mut tx = conn.start_transaction(TxOpts::default()).await?; tx.query_drop("SET autocommit=0").await?; tx.query_drop("SET unique_checks=0").await?; tx.query_drop("SET foreign_key_checks=0").await?; let mut statement = "INSERT INTO `search_users` (`username`) VALUES ".to_string(); for i in 0..self.tunable_insert_depth { if i < self.tunable_insert_depth - 1 { statement += format!("(:username{}), ", i).as_ref(); } else { statement += format!("(:username{})", i).as_ref(); } } let mut fallout: Option<Vec<_>> = None; let mut params = vec![]; for batch in keys.chunks(self.tunable_insert_depth) { if batch.len() < self.tunable_insert_depth { // final batch, use a new query fallout = Some(batch.to_vec()); } else { let pvec: Vec<_> = batch .iter() .enumerate() .map(|(idx, username)| { (format!("username{}", idx), Value::from(username.0.clone())) }) .collect(); params.push(mysql_async::Params::from(pvec)); } } if !params.is_empty() { // first do the big batches let out = tx.exec_batch(statement, params).await; self.check_for_infra_error(out)?; } if let Some(remainder) = fallout { // now there's some remainder that wasn't _exactly_ equal to MYSQL_EXTENDED_INSERT_DEPTH // we do it item-by-item let rlen = remainder.len(); let mut remainder_stmt = "INSERT INTO `search_users` (`username`) VALUES ".to_string(); for i in 0..rlen { if i < rlen - 1 { remainder_stmt += format!("(:username{}), ", i).as_ref(); } else { remainder_stmt += format!("(:username{})", i).as_ref(); } } // we don't need a prepared statement, since we're only doing this 1 time let users_vec: Vec<_> = remainder .iter() .enumerate() .map(|(idx, username)| { (format!("username{}", idx), Value::from(username.0.clone())) }) .collect(); let params_batch = mysql_async::Params::from(users_vec); let out = tx.exec_drop(remainder_stmt, params_batch).await; self.check_for_infra_error(out)?; } // re-enable all the checks tx.query_drop("SET autocommit=1").await?; tx.query_drop("SET unique_checks=1").await?; tx.query_drop("SET foreign_key_checks=1").await?; // commit the transaction tx.commit().await?; debug!("Querying records with JOIN"); // select all records for provided user names let mut params_map = vec![]; let (filter, epoch_grouping) = { // apply the specific filter match flag { ValueStateRetrievalFlag::SpecificVersion(version) => { params_map.push(("the_version", Value::from(version))); ("WHERE tmp.`version` = :the_version", "tmp.`epoch`") } ValueStateRetrievalFlag::SpecificEpoch(epoch) => { params_map.push(("the_epoch", Value::from(epoch))); ("WHERE tmp.`epoch` = :the_epoch", "tmp.`epoch`") } ValueStateRetrievalFlag::MaxEpoch => ("", "MAX(tmp.`epoch`)"), ValueStateRetrievalFlag::MinEpoch => ("", "MIN(tmp.`epoch`)"), ValueStateRetrievalFlag::LeqEpoch(epoch) => { params_map.push(("the_epoch", Value::from(epoch))); (" WHERE tmp.`epoch` <= :the_epoch", "MAX(tmp.`epoch`)") } } }; let select_statement = format!( r"SELECT full.`username`, full.`version` FROM {} full INNER JOIN ( SELECT tmp.`username`, {} AS `epoch` FROM {} tmp INNER JOIN `search_users` su ON su.`username` = tmp.`username` {} GROUP BY tmp.`username` ) epochs ON epochs.`username` = full.`username` AND epochs.`epoch` = full.`epoch` ", TABLE_USER, epoch_grouping, TABLE_USER, filter ); let out = if params_map.is_empty() { let _t = conn.query_iter(select_statement).await; self.check_for_infra_error(_t)? .reduce_and_drop(vec![], |mut acc, mut row: mysql_async::Row| { if let (Some(Ok(username)), Some(Ok(version))) = (row.take_opt(0), row.take_opt(1)) { acc.push((AkdLabel(username), version)) } acc }) .await? } else { let _t = conn .exec_iter(select_statement, mysql_async::Params::from(params_map)) .await; self.check_for_infra_error(_t)? .reduce_and_drop(vec![], |mut acc, mut row: mysql_async::Row| { if let (Some(Ok(username)), Some(Ok(version))) = (row.take_opt(0), row.take_opt(1)) { acc.push((AkdLabel(username), version)) } acc }) .await? }; debug!( "Retrieved {} records for {} users in query\nDropping search table...", out.len(), keys.len() ); let nout = conn.query_drop("DROP TEMPORARY TABLE `search_users`").await; self.check_for_infra_error(nout)?; let toc = Instant::now() - tic; *(self.time_read.write().await) += toc; for item in out.into_iter() { results.insert(item.0, item.1); } Ok::<(), MySqlError>(()) }; debug!("END MySQL get user states"); match result.await { Ok(()) => Ok(results), Err(error) => { error!("MySQL error {}", error); Err(StorageError::Other(format!("MySQL Error {}", error))) } } } async fn get_epoch_lte_epoch( &self, node_label: NodeLabel, epoch_in_question: u64, ) -> core::result::Result<u64, StorageError> { *(self.num_reads.write().await) += 1; self.record_call_stats('r', "get_epoch_lte_epoch".to_string(), "".to_string()) .await; let result = async { let tic = Instant::now(); let mut conn = self.get_connection().await?; let statement = format!("SELECT `epoch` FROM {} WHERE `label_len` = :len AND `label_val` = :val AND `epoch` <= :epoch ORDER BY `epoch` DESC LIMIT 1", TABLE_HISTORY_NODE_STATES); let out = conn .exec_first( statement, params! { "len" => node_label.len, "val" => node_label.val, "epoch" => epoch_in_question, }, ) .await; let toc = Instant::now() - tic; *(self.time_read.write().await) += toc; let result = self.check_for_infra_error(out)?; match result { Some(r) => Ok::<_, MySqlError>(r), None => Ok::<_, MySqlError>(u64::MAX), } }; debug!("END MySQL get epoch LTE epoch"); match result.await { Ok(u64::MAX) => Err(StorageError::NotFound(format!( "Node (val: {:?}, len: {}) did not exist <= epoch {}", node_label.val, node_label.len, epoch_in_question ))), Ok(ep) => Ok(ep), Err(error) => { error!("MySQL error {}", error); Err(StorageError::Other(format!("MySQL Error {}", error))) } } } }