example/facebook/FB/DataSource.hs (96 lines of code) (raw):
{-# LANGUAGE OverloadedStrings, StandaloneDeriving, RecordWildCards,
    GADTs, TypeFamilies, MultiParamTypeClasses, DeriveDataTypeable,
    FlexibleInstances #-}
-- QSem was deprecated in 7.6, but no more
{-# OPTIONS_GHC -fno-warn-deprecations #-}
module FB.DataSource
  ( FacebookReq(..)
  , initGlobalState
  , Credentials(..)
  , UserAccessToken
  , AccessToken(..)
  ) where
import Network.HTTP.Conduit
import Facebook as FB
import Control.Monad
import Control.Monad.Trans.Resource
import Data.Hashable
import Data.Typeable
import Network.HTTP.Client.TLS (tlsManagerSettings)
import Data.Conduit
import Data.Conduit.List hiding (mapM, mapM_)
import Data.Monoid
import Data.Aeson
import Control.Concurrent.Async
import Control.Concurrent.QSem
import Control.Exception
import Haxl.Core
data FacebookReq a where
   GetObject      :: Id -> FacebookReq Object
   GetUser        :: UserId -> FacebookReq User
   GetUserFriends :: UserId -> FacebookReq [Friend]
  deriving Typeable
deriving instance Eq (FacebookReq a)
deriving instance Show (FacebookReq a)
instance ShowP FacebookReq where showp = show
instance Hashable (FacebookReq a) where
  hashWithSalt s (GetObject (Id id))      = hashWithSalt s (0::Int,id)
  hashWithSalt s (GetUser (Id id))        = hashWithSalt s (1::Int,id)
  hashWithSalt s (GetUserFriends (Id id)) = hashWithSalt s (2::Int,id)
instance StateKey FacebookReq where
  data State FacebookReq =
    FacebookState
       { credentials :: Credentials
       , userAccessToken :: UserAccessToken
       , manager :: Manager
       , semaphore :: QSem
       }
instance DataSourceName FacebookReq where
  dataSourceName _ = "Facebook"
instance DataSource u FacebookReq where
  fetch = facebookFetch
initGlobalState
  :: Int
  -> Credentials
  -> UserAccessToken
  -> IO (State FacebookReq)
initGlobalState threads creds token = do
  manager <- newManager tlsManagerSettings
  sem <- newQSem threads
  return FacebookState
    { credentials = creds
    , manager = manager
    , userAccessToken = token
    , semaphore = sem
    }
facebookFetch
  :: State FacebookReq
  -> Flags
  -> u
  -> PerformFetch FacebookReq
facebookFetch FacebookState{..} _flags _user =
  BackgroundFetch $
    mapM_ (fetchAsync credentials manager userAccessToken semaphore)
fetchAsync
  :: Credentials -> Manager -> UserAccessToken -> QSem
  -> BlockedFetch FacebookReq
  -> IO ()
fetchAsync creds manager tok sem (BlockedFetch req rvar) =
  void $ async $ bracket_ (waitQSem sem) (signalQSem sem) $ do
    e <- Control.Exception.try $
           runResourceT $ runFacebookT creds manager $ fetchFBReq tok req
    case e of
      Left ex -> putFailure rvar (ex :: SomeException)
      Right a -> putSuccess rvar a
fetchFBReq
  :: UserAccessToken
  -> FacebookReq a
  -> FacebookT Auth (ResourceT IO) a
fetchFBReq tok (GetObject (Id id)) =
  getObject ("/" <> id) [] (Just tok)
fetchFBReq _tok (GetUser id) =
  getUser id [] Nothing
fetchFBReq tok (GetUserFriends id) = do
  f <- getUserFriends id [] tok
  source <- fetchAllNextPages f
  source $$ consume