glean/tools/gleancli/GleanCLI/Write.hs (306 lines of code) (raw):
{-
Copyright (c) Meta Platforms, Inc. and affiliates.
All rights reserved.
This source code is licensed under the BSD-style license found in the
LICENSE file in the root directory of this source tree.
-}
{-# LANGUAGE ApplicativeDo #-}
module GleanCLI.Write (WriteCommand, FinishCommand) where
import Control.Concurrent.STM
import Control.Exception
import Control.Monad
import Data.Aeson as Aeson
import qualified Data.Aeson.Types as Aeson
import qualified Data.ByteString as B
import qualified Data.ByteString.Char8 as B8
import Data.Default
import Data.Proxy
import qualified Data.HashMap.Strict as HashMap
import Data.Maybe
import Data.List.Split (splitOn)
import Data.Text (Text)
import qualified Data.Text as Text
import Options.Applicative
import Control.Concurrent.Stream (stream)
import Foreign.CPP.Dynamic (parseJSONWithOptions, JSONOptions(..))
import Thrift.Protocol.Compact (Compact)
import Thrift.Protocol
import Util.Control.Exception
import Util.IO
import Util.OptParse
import Glean hiding (options)
import Glean.Database.Schema
import Glean.Datasource.Scribe.Write
import Glean.Types as Thrift
import Glean.Util.Time
import Glean.Write
import Glean.Write.JSON ( buildJsonBatch )
import GleanCLI.Common
import GleanCLI.Finish
import GleanCLI.Types
import Data.Time.Clock (UTCTime)
import Glean.Database.Meta (utcTimeToPosixEpochTime)
data ScribeOptions = ScribeOptions
{ writeFromScribe :: WriteFromScribe
, scribeCompress :: Bool
}
data FileFormat
= JsonFormat
| BinaryFormat
parseFileFormat :: String -> Either String FileFormat
parseFileFormat "json" = Right JsonFormat
parseFileFormat "binary" = Right BinaryFormat
parseFileFormat s = Left $ "unknown format: " <> s
data WriteCommand
= Write
{ writeRepo :: Repo
, writeRepoTime :: Maybe UTCTime
, writeHandle :: Text
, writeFiles :: [FilePath]
, create :: Bool
, dependencies :: Maybe Thrift.Dependencies
, scribe :: Maybe ScribeOptions
, finish :: Bool
, properties :: [(Text,Text)]
, writeMaxConcurrency :: Int
, useLocalCache :: Maybe Glean.SendAndRebaseQueueSettings
, writeFileFormat :: FileFormat
}
instance Plugin WriteCommand where
parseCommand = createCmd <|> writeCmd
where
createCmd =
commandParser "create" (progDesc "Create a new database") $ do
writeRepo <- repoOpts
writeRepoTime <- optional $ option readTime
( long "repo-hash-time"
<> metavar "yyyy-mm-ddThh:mm:ssZ"
<> help "Set properties when creating a DB"
)
writeFiles <- fileArgs
finish <- finishOpt
scribe <- Just <$> scribeOptions <|> pure Nothing
dependencies <- optional (stackedOptions <|> updateOptions)
properties <- many $ option readProperty
( long "property"
<> metavar "NAME=VALUE"
<> help "Set properties when creating a DB"
)
writeHandle <- handleOpt
writeMaxConcurrency <- maxConcurrencyOpt
useLocalCache <- useLocalCacheOptions
return Write
{ create=True
, writeFileFormat=JsonFormat
, ..
}
readProperty :: ReadM (Text,Text)
readProperty = eitherReader $ \str ->
case break (=='=') str of
(name, '=':value) -> Right (Text.pack name, Text.pack value)
_other -> Left "--property: expecting NAME=VALUE"
readTime :: ReadM UTCTime
readTime = eitherReader $ \str ->
case readUTC $ Text.pack str of
Just value -> Right value
Nothing ->
Left "expecting time e.g. 2021-01-01T12:30:00Z"
writeCmd =
commandParser "write" (progDesc "Write facts to a database") $ do
~(writeRepo, scribe) <-
(,Nothing) <$> repoOpts <|>
(do
~(cat, bucket, compress) <- writeScribeOpts
return (def, Just ScribeOptions
{ writeFromScribe = def
{ writeFromScribe_category = cat
, writeFromScribe_bucket = bucket }
, scribeCompress = compress }))
writeFiles <- fileArgs
finish <- finishOpt
writeHandle <- handleOpt
writeMaxConcurrency <- maxConcurrencyOpt
useLocalCache <- useLocalCacheOptions
writeInventory :: Maybe String <- optional $ strOption
( long "inventory"
<> metavar "PATH"
<> help "Deprecated - use file-format instead"
)
writeFileFormat <-
optional $ option (eitherReader parseFileFormat)
( long "file-format"
<> metavar "(json|binary)"
<> help "Format of the input files"
)
return Write
{ create=False, writeRepoTime=Nothing
, properties=[], dependencies=Nothing
, writeFileFormat = fromMaybe
(if isJust writeInventory then BinaryFormat else JsonFormat)
writeFileFormat
, ..
}
finishOpt = switch
( long "finish"
<> help "also mark the DB as complete")
writeScribeOpts :: Parser (Text, Maybe PickScribeBucket, Bool)
writeScribeOpts = do
cat <- textOption (long "scribe-category" <> metavar "NAME")
bucket <- optional $ fmap PickScribeBucket_bucket $
option auto (long "scribe-bucket" <> metavar "BUCKET")
compress <- switch (long "compress")
return (cat, bucket, compress)
scribeOptions :: Parser ScribeOptions
scribeOptions = do
~(cat, bucket, compress) <- writeScribeOpts
let
startTime = Just . ScribeStart_start_time <$>
textOption (long "start-time" <> metavar "TIME")
checkpoint = Just . ScribeStart_checkpoint <$>
textOption (long "checkpoint" <> metavar "STRING")
start <- startTime <|> checkpoint <|> pure Nothing
opts <- SendJsonBatchOptions <$> switch (long "no-base64-binary")
return ScribeOptions
{ writeFromScribe = WriteFromScribe "" cat start (Just opts) bucket
, scribeCompress = compress
}
useLocalCacheOptions
:: Parser (Maybe Glean.SendAndRebaseQueueSettings)
useLocalCacheOptions = do
useLocalCacheFlag <- switch
( long "use-local-cache"
<> help "use a cache to rebase facts locally"
)
sendAndRebaseQueue <- Glean.sendAndRebaseQueueOptions
return $ if useLocalCacheFlag then
Just sendAndRebaseQueue
else
Nothing
stackedOptions = Thrift.Dependencies_stacked
<$> option (maybeReader Glean.parseRepo)
( long "stacked"
<> metavar "REPO"
<> help "Create a stacked database"
)
updateOptions = do
repo <- option (maybeReader Glean.parseRepo)
( long "incremental"
<> metavar "REPO"
<> help "Create an incremental database"
)
let
splitUnits = map B8.pack . splitOn ","
include = (,False) . splitUnits <$> strOption
( long "include"
<> metavar "unit,unit,.."
<> help "Include these units"
)
exclude = (,True) . splitUnits <$> strOption
( long "exclude"
<> metavar "unit,unit,.."
<> help "Exclude these units"
)
~(units, exclude) <- include <|> exclude
return $ Thrift.Dependencies_pruned $
Thrift.Pruned repo units exclude
runCommand _ _ backend Write{..} =
tryBracket
(when create $ do
putStrLn $ "Creating DB using handle " ++ Text.unpack writeHandle
Thrift.KickOffResponse alreadyExists <-
Glean.kickOffDatabase backend def
{ kickOff_repo = writeRepo
, kickOff_fill = Just $ case scribe of
Nothing -> KickOffFill_writeHandle writeHandle
Just scribe -> KickOffFill_scribe
(writeFromScribe scribe)
{ writeFromScribe_writeHandle = writeHandle }
, kickOff_properties = HashMap.fromList properties
, kickOff_dependencies = dependencies
, kickOff_repo_hash_time =
utcTimeToPosixEpochTime <$> writeRepoTime
}
when alreadyExists $ die 3 "DB create failure: already exists"
)
(\_ result ->
let mFail = resultToFailure result in
if finish then
finished backend writeRepo writeHandle
Nothing Nothing (fmap Text.pack mFail)
else
let writeFail err = die 3 $ "DB write failure: " ++ err in
maybe (return ()) writeFail mFail)
(\_ ->
write
writeRepo
writeFiles
writeMaxConcurrency
scribe
useLocalCache
writeFileFormat)
where
write repo files max Nothing (Just useLocalCache) fileFormat = do
schemaInfo <- Glean.getSchemaInfo backend repo
dbSchema <- fromSchemaInfo schemaInfo readWriteContent
logMessages <- newTQueueIO
let inventory = schemaInventory dbSchema
Glean.withSendAndRebaseQueue backend repo inventory useLocalCache $
\queue ->
stream max (forM_ files) $ \file -> do
batch <- case fileFormat of
BinaryFormat -> do
r <- B.readFile file
case deserializeGen (Proxy :: Proxy Compact) r of
Left parseError -> die 3 $ "Parse error: " <> parseError
Right result -> return result
JsonFormat -> do
batches <- toBatches file
buildJsonBatch dbSchema Nothing batches
_ <- Glean.writeSendAndRebaseQueue queue batch $
\_ -> writeTQueue logMessages $ "Wrote " <> file
atomically (flushTQueue logMessages) >>= mapM_ putStrLn
return ()
atomically (flushTQueue logMessages) >>= mapM_ putStrLn
write repo files max Nothing Nothing BinaryFormat =
stream max (forM_ files) $ \file -> do
handleAll (\e -> do throwIO $ ErrorCall $ file <> ": " <> show e) $ do
r <- B.readFile file
batch <- case deserializeGen (Proxy :: Proxy Compact) r of
Left parseError -> die 3 $ "Parse error: " <> parseError
Right result -> return result
void $ Glean.sendBatch backend repo batch
write repo files max scribe Nothing JsonFormat = do
stream max (forM_ files) $ \file -> do
batches <- toBatches file
case scribe of
Nothing -> void $ Glean.sendJsonBatch backend repo batches Nothing
Just ScribeOptions
{ writeFromScribe = WriteFromScribe{..}, .. } ->
scribeWriteBatches
writeFromScribe_category
(case writeFromScribe_bucket of
Just (PickScribeBucket_bucket n) ->
Just (fromIntegral n :: Int)
Nothing -> Nothing)
batches
scribeCompress
write _repo _files _max (Just _scribe) (Just _useLocalCache) _ =
die 3 "Cannot use a local cache with scribe"
write _repo _files _max (Just _scribe) Nothing BinaryFormat =
die 3 "Cannot use binary format with scribe"
resultToFailure Right{} = Nothing
resultToFailure (Left err) = Just (show err)
toBatches :: FilePath -> IO [JsonFactBatch]
toBatches file = do
bs <- B.readFile file
r <- Foreign.CPP.Dynamic.parseJSONWithOptions opts bs
val <- case r of
Right val -> return val
Left err -> throwIO $ ErrorCall $ file ++ ": " ++ Text.unpack err
case Aeson.parse parseJsonFactBatches val of
Aeson.Error str -> throwIO $ ErrorCall $ file ++ ": " ++ str
Aeson.Success x -> return x
where
-- folly's default recursion limit is 100, which is not enough for us.
opts = Foreign.CPP.Dynamic.JSONOptions
{ json_recursionLimit = Just 500 }