glean/db/Glean/Database/Backup.hs (341 lines of code) (raw):

{- Copyright (c) Meta Platforms, Inc. and affiliates. All rights reserved. This source code is licensed under the BSD-style license found in the LICENSE file in the root directory of this source tree. -} {-# LANGUAGE DeriveGeneric #-} module Glean.Database.Backup ( backuper , Event(..) ) where import Control.Applicative import Control.Concurrent.Async import Control.Concurrent.STM import Control.Exception import Control.Monad import qualified Data.ByteString.Lazy as LBS import qualified Data.HashMap.Strict as HashMap import Data.HashSet (HashSet) import qualified Data.HashSet as HashSet import Data.Hashable (Hashable) import Data.List import qualified Data.Map as Map import Data.Maybe import Data.Ord import Data.Text (Text) import qualified Data.Text as Text import Data.Time (getCurrentTime) import Data.Typeable (Typeable) import GHC.Generics hiding (Meta) import System.Directory import System.FilePath import Util.Control.Exception import Util.Graph import Util.IO (safeRemovePathForcibly) import Util.Log import Util.Logger import Glean.Database.Backup.Backend as Backend import Glean.Database.Backup.Locator import qualified Glean.Database.Catalog as Catalog import Glean.Database.Catalog.Filter import Glean.Database.CompletePredicates import Glean.Database.Data (storeSchema) import qualified Glean.Database.Logger as Logger import Glean.Database.Meta import Glean.Database.Repo import qualified Glean.Database.Storage as Storage import Glean.Database.Open (withOpenDatabase, schemaUpdated) import Glean.Database.Types import Glean.Database.Schema import Glean.Logger import Glean.Repo.Text import Glean.ServerConfig.Types (DatabaseBackupPolicy(..)) import qualified Glean.ServerConfig.Types as ServerConfig import Glean.Internal.Types as Thrift import Glean.Types as Thrift import Glean.Util.Some import Glean.Util.Observed as Observed import Glean.Util.Trace data Event = BackupStarted Repo | BackupFinished Repo | BackupFailed Repo | RestoreStarted Repo | RestoreFinished Repo | RestoreFailed Repo | FinalizeStarted Repo | FinalizeFinished Repo | FinalizeFailed Repo | Waiting deriving (Eq, Generic, Show, Typeable) instance Hashable Event type Todo = (Repo, IO Bool) -- | Set of Repos that shouldn't be backed up (because we already tried and -- failed). We currently only remove things from here when they get deleted from -- our DB list which matches previous behaviour. Eventually, we should retry -- with exponential back-off. type SinBin = HashSet Repo -- | Get the next thing to do. Prioritises restores over backups and prefers -- newer databases. getTodo :: Env -> SinBin -> STM Todo getTodo env@Env{..} sinbin = getFinalize <|> getRestore <|> getBackup where getFinalize = do dbs <- Catalog.list envCatalog [Local] $ do completenessV .==. CompletenessFinalizing latest case dbs of [] -> retry Item{..} : _ -> return (itemRepo, doFinalize env itemRepo) getRestore = do let newestByRepo = groupF repoNameV $ do sortF createdV Descending limitF 1 restoring <- Catalog.list envCatalog [Restoring] $ do repoV `notInF` sinbin newestByRepo when (null restoring) retry avail <- Catalog.list envCatalog [Local] $ do repoV `notInF` sinbin completenessV .==. CompletenessComplete newestByRepo case bestRestore restoring avail of [] -> retry Item{..} : _ -> return (itemRepo, doRestore env itemRepo itemMeta) getBackup = do policy <- ServerConfig.config_backup <$> Observed.get envServerConfig let allowedRepoNames = databaseBackupPolicy_allowed policy dbs <- Catalog.list envCatalog [Local] $ do repoNameV `inF` allowedRepoNames completenessV .==. CompletenessComplete backedUpV .==. False latest case dbs of [] -> retry Item{..} : _ -> do dsite <- getSite env (repo_name itemRepo) case dsite of Just (prefix, Some site, _policy) -> return (itemRepo, doBackup env itemRepo prefix site) Nothing -> retry latest = do repoV `notInF` sinbin sortF createdV Descending limitF 1 -- To find the next DB to restore, we -- - find the newest DB of each repo that needs restoring -- - restore the DB which is currently the most stale; that is -- where the newest, complete, local DB of the same repo is the oldest bestRestore :: [Item] -> [Item] -> [Item] bestRestore restoring avail = depsFirst (sortBy order restoring) where -- stalest first, and then most recent if none are stale order a b = comparing staleness a b <> comparing (metaCreated . itemMeta) b a -- NB. descending availMap = HashMap.fromList [ (repo_name (itemRepo item), item) | item <- avail ] -- creation time of the local DB if stale, or maxBound staleness backup = case HashMap.lookup (repo_name (itemRepo backup)) availMap of Nothing -> NoLocalDb Just local | tlocal > tbackup -> LocalDbNewer | otherwise -> LocalDbOlder tlocal where tlocal = timestamp local tbackup = timestamp backup repoHashTime = metaRepoHashTime . itemMeta creationTime = metaCreated . itemMeta timestamp item = fromMaybe (creationTime item) (repoHashTime item) -- ^ prefer the exact repoHash time if we have it, otherwise use the -- creationTime as the best guess we have -- ensure we're restoring dependencies of a stacked DB first, if -- they need restoring. depsFirst items = postorder items itemRepo itemDeps where itemDeps item = case metaDependencies (itemMeta item) of Nothing -> [] Just (Dependencies_stacked repo) -> [repo] Just (Dependencies_pruned Pruned{..}) -> [pruned_base] data Staleness = NoLocalDb | LocalDbOlder PosixEpochTime | LocalDbNewer deriving (Eq,Ord) doBackup :: Site site => Env -> Repo -> Text -> site -> IO Bool doBackup env@Env{..} repo prefix site = do atomically $ notify envListener $ BackupStarted repo say logInfo "starting" withOpenDatabase env repo $ \OpenDB{..} -> do say logInfo "packing" stats <- mapMaybe (\(pid,stats) -> (,stats) . predicateRef <$> lookupPid pid odbSchema) <$> Storage.predicateStats odbHandle Backend.Data{..} <- withScratchDirectory envRoot repo $ \scratch -> Storage.backup odbHandle scratch $ \bytes -> do say logInfo "uploading" policy <- ServerConfig.databaseBackupPolicy_repos . ServerConfig.config_backup <$> Observed.get envServerConfig let ttl = case ServerConfig.backup_delete_after_seconds <$> Map.lookup (repo_name repo) policy of Just 0 -> Nothing ttl -> fromIntegral <$> ttl meta <- atomically $ Catalog.readMeta envCatalog repo Backend.backup site repo (metaToProps meta) ttl bytes Logger.logDBStatistics env repo stats dataSize say logInfo "finished" atomically $ do void $ Catalog.modifyMeta envCatalog repo $ \meta -> return meta { metaBackup = Just $ toRepoLocator prefix site repo } notify envListener $ BackupFinished repo return True `catch` \exc -> do atomically $ notify envListener $ BackupFailed repo say logError $ "failed: " ++ show (exc :: SomeException) rethrowAsync exc return False where say log s = log $ inRepo repo $ "backup: " ++ s doRestore :: Env -> Repo -> Meta -> IO Bool doRestore env@Env{..} repo meta | Just loc <- metaBackup meta , Just (_, Some site, r_repo) <- fromRepoLocator envBackupBackends loc , r_repo == repo = loggingAction (runLogRepo "restore" env repo) (const mempty) (do atomically $ notify envListener $ RestoreStarted repo withScratchDirectory envRoot repo $ \scratch -> do say logInfo "starting" say logInfo "downloading" let scratch_restore = scratch </> "restore" scratch_file = scratch </> "file" -- TODO: implement buffered downloads in Manifold client void $ Backend.restore site repo scratch_file bytes <- LBS.readFile scratch_file say logInfo "restoring" createDirectoryIfMissing True scratch_restore Storage.restore envStorage repo scratch_restore bytes say logInfo "adding" Catalog.finishRestoring envCatalog repo atomically $ notify envListener $ RestoreFinished repo say logInfo "finished" return True ) `catch` \exc -> do failed <- atomically $ do failed <- Catalog.exists envCatalog [Restoring] repo when failed $ do Catalog.abortRestoring envCatalog repo notify envListener $ RestoreFailed repo return failed when failed $ do say logError $ "failed: " ++ show exc swallow $ safeRemovePathForcibly $ databasePath envRoot repo rethrowAsync exc -- NOTE: No point in adding the repo to the sinbin, we just removed -- it from the list of known DBs anyway. return True | otherwise = do atomically $ notify envListener $ RestoreStarted repo say logError $ case metaBackup meta of Just loc -> "invalid location " ++ show loc Nothing -> "missing location" atomically $ notify envListener $ RestoreFailed repo return False where say log s = log $ inRepo repo $ "restore: " ++ s doFinalize :: Env -> Repo -> IO Bool doFinalize env@Env{..} repo = do atomically $ notify envListener $ FinalizeStarted repo -- If the client didn't explicitly call completePredicates, we'll -- do that now. Do this *before* optimising/compacting, because -- that deletes all the temporary ownership information consumed -- by this pass. meta <- atomically $ Catalog.readMeta envCatalog repo when (not (metaAxiomComplete meta)) $ syncCompletePredicates env repo config <- Observed.get envServerConfig withOpenDatabase env repo $ \odb@OpenDB{..} -> do when (ServerConfig.config_compact_on_completion config) $ do say logInfo "optimising" Storage.optimize odbHandle thinSchema repo odb -- update and re-merge our internal representation of the schema schemaUpdated env (Just repo) time <- getCurrentTime atomically $ do void $ Catalog.modifyMeta envCatalog repo $ \meta -> return meta { metaCompleteness = Complete $ DatabaseComplete $ utcTimeToPosixEpochTime time } notify envListener $ FinalizeFinished repo say logInfo "finished" return True `catch` \exc -> do let interrupted | Just AsyncCancelled{} <- fromException exc = True | otherwise = False time <- getCurrentTime -- If we were interrupted, then keep the DB in the finalizing state -- and we'll try to finalize it again later. If there was some other -- exception, let's just ignore the finalize step and mark the DB -- complete. This shouldn't happen, but we have no choice because we -- report a DB in the finalizing state as complete to a client, so -- we can't later report it as broken or incomplete. atomically $ do notify envListener $ FinalizeFailed repo when (not interrupted) $ void $ Catalog.modifyMeta envCatalog repo $ \meta -> return meta { metaCompleteness = Complete $ DatabaseComplete $ utcTimeToPosixEpochTime time } say logError $ (if interrupted then "interrupted" else "failed") <> ": " <> show (exc :: SomeException) rethrowAsync exc return False where say log s = log $ inRepo repo $ "finalize: " ++ s thinSchema :: Repo -> OpenDB -> IO () thinSchema repo OpenDB{..} = do stats <- Storage.predicateStats odbHandle let (newSchemaInfo, names) = thinSchemaInfo odbSchema stats storeSchema odbHandle newSchemaInfo logInfo $ "thinned schema for " <> showRepo repo <> " contains " <> intercalate ", " (map Text.unpack names) -- | Back up databases forever. backuper :: Env -> IO () backuper env@Env{..} = loop mempty `catchAll` \exc -> do logError $ "unexpected error in backup thread: " ++ show exc backuper env where loop sinbin = do (repo, action) <- do r <- atomically $ Just <$> getTodo env sinbin <|> return Nothing case r of Just todo -> return todo Nothing -> do atomically $ notify envListener Waiting atomically $ getTodo env sinbin ok <- action `catchAll` \exc -> do -- NOTE: action shouldn't throw so this shouldn't ever run logError $ "unexpected error during backup: " ++ show exc return False sinbin' <- HashSet.fromList . map itemRepo <$> atomically (Catalog.list envCatalog [Local, Restoring] $ repoV `inF` if ok then sinbin else HashSet.insert repo sinbin) loop sinbin' withScratchDirectory :: FilePath -> Repo -> (FilePath -> IO a) -> IO a withScratchDirectory root repo action = do safeRemovePathForcibly scratch bracket_ (createDirectoryIfMissing True scratch) (logExceptions id $ safeRemovePathForcibly scratch) (action scratch) where scratch = root </> ".scratch" </> databaseSubdir repo rethrowAsync :: SomeException -> IO () rethrowAsync exc | isAsyncException exc = throwIO exc | otherwise = return ()