diff --git a/app/Main.hs b/app/Main.hs index 7cda6816..6e137163 100644 --- a/app/Main.hs +++ b/app/Main.hs @@ -3,8 +3,8 @@ {-# LANGUAGE BangPatterns #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE QuasiQuotes #-} -{-# LANGUAGE DeriveGeneric #-} -{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE FlexibleInstances #-} module Main where @@ -142,15 +142,34 @@ executeWorkerProcess = do -- turnOffTlsValidation - withLogger logConfig (\_ -> pure ()) $ \logger -> liftIO $ do - (z, validations) <- runValidatorT - (newScopes "worker-create-app-context") - (createWorkerAppContext config logger) - case z of - Left e -> - logError logger [i|Couldn't initialise: #{e}, problems: #{validations}.|] - Right appContext -> - executeWorker input appContext + executeWork input $ \_ resultHandler -> + withLogger logConfig (\_ -> pure ()) $ \logger -> liftIO $ do + (z, validations) <- runValidatorT + (newScopes "worker-create-app-context") + (createWorkerAppContext config logger) + case z of + Left e -> + logError logger [i|Couldn't initialise: #{e}, problems: #{validations}.|] + Right appContext -> + case input ^. #params of + RrdpFetchParams {..} -> exec resultHandler $ + fmap RrdpFetchResult $ runValidatorT scopes $ + updateObjectForRrdpRepository appContext worldVersion rrdpRepository + + RsyncFetchParams {..} -> exec resultHandler $ + fmap RsyncFetchResult $ runValidatorT scopes $ + updateObjectForRsyncRepository appContext fetchConfig worldVersion rsyncRepository + + CompactionParams {..} -> exec resultHandler $ + CompactionResult <$> copyLmdbEnvironment appContext targetLmdbEnv + + ValidationParams {..} -> exec resultHandler $ + uncurry ValidationResult <$> runValidation appContext worldVersion tals + + CacheCleanupParams {..} -> exec resultHandler $ + CacheCleanupResult <$> runCacheCleanup appContext worldVersion + where + exec resultHandler f = resultHandler =<< execWithStats f turnOffTlsValidation = do @@ -458,29 +477,6 @@ rsyncPrefetches CLIOptions {..} = do Right rsyncURL -> pure rsyncURL --- This is for worker processes. -executeWorker :: WorkerInput - -> AppLmdbEnv - -> IO () -executeWorker input appContext = - executeWork input $ \_ resultHandler -> do - case input ^. #params of - RrdpFetchParams {..} -> exec resultHandler $ - fmap RrdpFetchResult $ runValidatorT scopes $ updateObjectForRrdpRepository - appContext worldVersion rrdpRepository - RsyncFetchParams {..} -> exec resultHandler $ - fmap RsyncFetchResult $ runValidatorT scopes $ updateObjectForRsyncRepository - appContext fetchConfig worldVersion rsyncRepository - CompactionParams {..} -> exec resultHandler $ - CompactionResult <$> copyLmdbEnvironment appContext targetLmdbEnv - ValidationParams {..} -> exec resultHandler $ - uncurry ValidationResult <$> runValidation appContext worldVersion tals - CacheCleanupParams {..} -> exec resultHandler $ - CacheCleanupResult <$> runCacheCleanup appContext worldVersion - where - exec resultHandler f = resultHandler =<< execWithStats f - - createWorkerAppContext :: Config -> AppLogger -> ValidatorT IO AppLmdbEnv createWorkerAppContext config logger = do lmdbEnv <- setupWorkerLmdbCache diff --git a/src/RPKI/Logging.hs b/src/RPKI/Logging.hs index a3b8d753..8885268f 100644 --- a/src/RPKI/Logging.hs +++ b/src/RPKI/Logging.hs @@ -19,6 +19,7 @@ import qualified Data.ByteString.Char8 as C8 import Data.Bifunctor import Data.Foldable +import Data.Maybe import Data.Text (Text, justifyLeft) import Data.String.Interpolate.IsString @@ -211,7 +212,7 @@ withLogger LogConfig {..} sysMetricCallback f = do MsgQE z -> processMessageInMainProcess z - -- Worker simply re-sends all the binary messages + -- Worker simply resends all the binary messages -- (received from children processes) to its parent. -- Messages from this process are serialised and then sent let loopWorker = loopReadQueue messageQueue $ logRaw . \case diff --git a/src/RPKI/Orphans/Json.hs b/src/RPKI/Orphans/Json.hs index 0aefc151..6df542c9 100644 --- a/src/RPKI/Orphans/Json.hs +++ b/src/RPKI/Orphans/Json.hs @@ -44,12 +44,10 @@ import HaskellWorks.Data.Network.Ip.Ip as Ips import RPKI.AppTypes import RPKI.Domain as Domain -import RPKI.RRDP.Types (RrdpSerial) import RPKI.Config import RPKI.Logging import RPKI.Reporting -import RPKI.Repository import RPKI.RRDP.Types import RPKI.Metrics.Metrics import RPKI.Metrics.System diff --git a/src/RPKI/Worker.hs b/src/RPKI/Worker.hs index bd0ad866..c0abe622 100644 --- a/src/RPKI/Worker.hs +++ b/src/RPKI/Worker.hs @@ -8,7 +8,6 @@ module RPKI.Worker where import Control.Exception.Lifted -import Control.Monad (void, when) import Control.Monad.IO.Class import Control.Concurrent import Control.Concurrent.Async @@ -148,56 +147,46 @@ data WorkerResult r = WorkerResult { executeWork :: WorkerInput -> (WorkerInput -> (forall a . TheBinary a => a -> IO ()) -> IO ()) -- ^ Actual work to be executed. -> IO () -executeWork input actualWork = do - exitCode <- newTVarIO Nothing - let forceExit code = atomically $ writeTVar exitCode (Just code) - void $ forkIO $ void $ race - (do - actualWork input writeWorkerOutput - `finally` - forceExit ExitSuccess) - (race - (dieIfParentDies forceExit) - (dieOfTiming forceExit)) - - -- this complication is to guarantee that it is the main thread - -- that exits the process as soon as there's exit code defined. - exitWith =<< atomically (maybe retry pure =<< readTVar exitCode) +executeWork input actualWork = do + exitCode <- anyOf + ((actualWork input writeWorkerOutput >> pure ExitSuccess) `onException` pure exceptionExitCode) + (anyOf dieIfParentDies dieOfTiming) + exitWith exitCode where + anyOf a b = either id id <$> race a b + -- Keep track of who's the current process parent: if it is not the same -- as we started with then parent exited/is killed. Exit the worker as well, -- there's no point continuing. - dieIfParentDies exit' = - loop 500_000 $ do + dieIfParentDies = go + where + go = do parentId <- getParentProcessID - when (parentId /= input ^. #initialParentId) $ - exit' exitParentDied + if parentId /= input ^. #initialParentId + then pure parentDiedExitCode + else threadDelay 500_000 >> go -- exit either because the time is up or too much CPU is spent - dieOfTiming exit' = + dieOfTiming = case input ^. #cpuLimit of - Nothing -> dieAfterTimeout exit' - Just cpuLimit -> - void $ race - (dieAfterTimeout exit') - (dieOutOfCpuTime cpuLimit exit') + Nothing -> dieAfterTimeout + Just cpuLimit -> anyOf dieAfterTimeout (dieOutOfCpuTime cpuLimit) -- Time bomb. Wait for the certain timeout and then exit. - dieAfterTimeout exit' = do + dieAfterTimeout = do let Timebox (Seconds s) = input ^. #workerTimeout threadDelay $ 1_000_000 * fromIntegral s - exit' exitTimeout + pure timeoutExitCode -- Exit if the worker consumed too much CPU time - dieOutOfCpuTime cpuLimit exit' = - loop 1_000_000 $ do + dieOutOfCpuTime cpuLimit = go + where + go = do cpuTime <- getCpuTime - when (cpuTime > cpuLimit) $ exit' exitOutOfCpuTime - - loop ms f = - f >> threadDelay ms >> loop ms f - + if cpuTime > cpuLimit + then pure outOfCpuTimeExitCode + else threadDelay 1_000_000 >> go readWorkerInput :: (MonadIO m) => m WorkerInput @@ -233,11 +222,13 @@ rtsMemValue mb = show mb <> "m" defaultRts :: [String] defaultRts = [ "-I0" ] -exitParentDied, exitTimeout, exitOutOfCpuTime, exitOutOfMemory, exitKillByTypedProcess :: ExitCode -exitParentDied = ExitFailure 11 -exitTimeout = ExitFailure 12 -exitOutOfCpuTime = ExitFailure 13 -exitOutOfMemory = ExitFailure 251 +parentDiedExitCode, timeoutExitCode, outOfCpuTimeExitCode, outOfMemoryExitCode :: ExitCode +exitKillByTypedProcess, exceptionExitCode :: ExitCode +exceptionExitCode = ExitFailure 99 +parentDiedExitCode = ExitFailure 111 +timeoutExitCode = ExitFailure 122 +outOfCpuTimeExitCode = ExitFailure 113 +outOfMemoryExitCode = ExitFailure 251 exitKillByTypedProcess = ExitFailure (-2) worderIdS :: WorkerId -> String @@ -295,17 +286,17 @@ runWorker logger config workerId params timeout cpuLimit extraCli = do Right r -> pure r exit@(ExitFailure errorCode) - | exit == exitTimeout -> do + | exit == timeoutExitCode -> do let message = [i|Worker #{workerId} execution timed out.|] logError logger message trace WorkerTimeoutTrace appError $ InternalE $ WorkerTimeout message - | exit == exitOutOfCpuTime -> do + | exit == outOfCpuTimeExitCode -> do let message = [i|Worker #{workerId} ran out of CPU time.|] logError logger message trace WorkerCpuOveruseTrace appError $ InternalE $ WorkerOutOfCpuTime message - | exit == exitOutOfMemory -> do + | exit == outOfMemoryExitCode -> do let message = [i|Worker #{workerId} ran out of memory.|] logError logger message appError $ InternalE $ WorkerOutOfMemory message diff --git a/stack.yaml b/stack.yaml index 7bd8fab1..ef2cdd25 100644 --- a/stack.yaml +++ b/stack.yaml @@ -1,4 +1,4 @@ -resolver: lts-22.23 +resolver: lts-22.26 packages: - .