Skip to content

Commit

Permalink
Merge pull request #212 from lolepezy/fix-forkos-entry
Browse files Browse the repository at this point in the history
Fix premature worker timeouts
  • Loading branch information
lolepezy authored Jun 24, 2024
2 parents 6e09b9b + 36b1d02 commit e5d1524
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 81 deletions.
64 changes: 30 additions & 34 deletions app/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE FlexibleInstances #-}

module Main where

Expand Down Expand Up @@ -142,15 +142,34 @@ executeWorkerProcess = do

-- turnOffTlsValidation

withLogger logConfig (\_ -> pure ()) $ \logger -> liftIO $ do
(z, validations) <- runValidatorT
(newScopes "worker-create-app-context")
(createWorkerAppContext config logger)
case z of
Left e ->
logError logger [i|Couldn't initialise: #{e}, problems: #{validations}.|]
Right appContext ->
executeWorker input appContext
executeWork input $ \_ resultHandler ->
withLogger logConfig (\_ -> pure ()) $ \logger -> liftIO $ do
(z, validations) <- runValidatorT
(newScopes "worker-create-app-context")
(createWorkerAppContext config logger)
case z of
Left e ->
logError logger [i|Couldn't initialise: #{e}, problems: #{validations}.|]
Right appContext ->
case input ^. #params of
RrdpFetchParams {..} -> exec resultHandler $
fmap RrdpFetchResult $ runValidatorT scopes $
updateObjectForRrdpRepository appContext worldVersion rrdpRepository

RsyncFetchParams {..} -> exec resultHandler $
fmap RsyncFetchResult $ runValidatorT scopes $
updateObjectForRsyncRepository appContext fetchConfig worldVersion rsyncRepository

CompactionParams {..} -> exec resultHandler $
CompactionResult <$> copyLmdbEnvironment appContext targetLmdbEnv

ValidationParams {..} -> exec resultHandler $
uncurry ValidationResult <$> runValidation appContext worldVersion tals

CacheCleanupParams {..} -> exec resultHandler $
CacheCleanupResult <$> runCacheCleanup appContext worldVersion
where
exec resultHandler f = resultHandler =<< execWithStats f


turnOffTlsValidation = do
Expand Down Expand Up @@ -458,29 +477,6 @@ rsyncPrefetches CLIOptions {..} = do
Right rsyncURL -> pure rsyncURL


-- This is for worker processes.
executeWorker :: WorkerInput
-> AppLmdbEnv
-> IO ()
executeWorker input appContext =
executeWork input $ \_ resultHandler -> do
case input ^. #params of
RrdpFetchParams {..} -> exec resultHandler $
fmap RrdpFetchResult $ runValidatorT scopes $ updateObjectForRrdpRepository
appContext worldVersion rrdpRepository
RsyncFetchParams {..} -> exec resultHandler $
fmap RsyncFetchResult $ runValidatorT scopes $ updateObjectForRsyncRepository
appContext fetchConfig worldVersion rsyncRepository
CompactionParams {..} -> exec resultHandler $
CompactionResult <$> copyLmdbEnvironment appContext targetLmdbEnv
ValidationParams {..} -> exec resultHandler $
uncurry ValidationResult <$> runValidation appContext worldVersion tals
CacheCleanupParams {..} -> exec resultHandler $
CacheCleanupResult <$> runCacheCleanup appContext worldVersion
where
exec resultHandler f = resultHandler =<< execWithStats f


createWorkerAppContext :: Config -> AppLogger -> ValidatorT IO AppLmdbEnv
createWorkerAppContext config logger = do
lmdbEnv <- setupWorkerLmdbCache
Expand Down
3 changes: 2 additions & 1 deletion src/RPKI/Logging.hs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import qualified Data.ByteString.Char8 as C8

import Data.Bifunctor
import Data.Foldable
import Data.Maybe
import Data.Text (Text, justifyLeft)

import Data.String.Interpolate.IsString
Expand Down Expand Up @@ -211,7 +212,7 @@ withLogger LogConfig {..} sysMetricCallback f = do

MsgQE z -> processMessageInMainProcess z

-- Worker simply re-sends all the binary messages
-- Worker simply resends all the binary messages
-- (received from children processes) to its parent.
-- Messages from this process are serialised and then sent
let loopWorker = loopReadQueue messageQueue $ logRaw . \case
Expand Down
2 changes: 0 additions & 2 deletions src/RPKI/Orphans/Json.hs
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,10 @@ import HaskellWorks.Data.Network.Ip.Ip as Ips

import RPKI.AppTypes
import RPKI.Domain as Domain
import RPKI.RRDP.Types (RrdpSerial)
import RPKI.Config

import RPKI.Logging
import RPKI.Reporting
import RPKI.Repository
import RPKI.RRDP.Types
import RPKI.Metrics.Metrics
import RPKI.Metrics.System
Expand Down
77 changes: 34 additions & 43 deletions src/RPKI/Worker.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
module RPKI.Worker where

import Control.Exception.Lifted
import Control.Monad (void, when)
import Control.Monad.IO.Class
import Control.Concurrent
import Control.Concurrent.Async
Expand Down Expand Up @@ -148,56 +147,46 @@ data WorkerResult r = WorkerResult {
executeWork :: WorkerInput
-> (WorkerInput -> (forall a . TheBinary a => a -> IO ()) -> IO ()) -- ^ Actual work to be executed.
-> IO ()
executeWork input actualWork = do
exitCode <- newTVarIO Nothing
let forceExit code = atomically $ writeTVar exitCode (Just code)
void $ forkIO $ void $ race
(do
actualWork input writeWorkerOutput
`finally`
forceExit ExitSuccess)
(race
(dieIfParentDies forceExit)
(dieOfTiming forceExit))

-- this complication is to guarantee that it is the main thread
-- that exits the process as soon as there's exit code defined.
exitWith =<< atomically (maybe retry pure =<< readTVar exitCode)
executeWork input actualWork = do
exitCode <- anyOf
((actualWork input writeWorkerOutput >> pure ExitSuccess) `onException` pure exceptionExitCode)
(anyOf dieIfParentDies dieOfTiming)

exitWith exitCode
where
anyOf a b = either id id <$> race a b

-- Keep track of who's the current process parent: if it is not the same
-- as we started with then parent exited/is killed. Exit the worker as well,
-- there's no point continuing.
dieIfParentDies exit' =
loop 500_000 $ do
dieIfParentDies = go
where
go = do
parentId <- getParentProcessID
when (parentId /= input ^. #initialParentId) $
exit' exitParentDied
if parentId /= input ^. #initialParentId
then pure parentDiedExitCode
else threadDelay 500_000 >> go

-- exit either because the time is up or too much CPU is spent
dieOfTiming exit' =
dieOfTiming =
case input ^. #cpuLimit of
Nothing -> dieAfterTimeout exit'
Just cpuLimit ->
void $ race
(dieAfterTimeout exit')
(dieOutOfCpuTime cpuLimit exit')
Nothing -> dieAfterTimeout
Just cpuLimit -> anyOf dieAfterTimeout (dieOutOfCpuTime cpuLimit)

-- Time bomb. Wait for the certain timeout and then exit.
dieAfterTimeout exit' = do
dieAfterTimeout = do
let Timebox (Seconds s) = input ^. #workerTimeout
threadDelay $ 1_000_000 * fromIntegral s
exit' exitTimeout
pure timeoutExitCode

-- Exit if the worker consumed too much CPU time
dieOutOfCpuTime cpuLimit exit' =
loop 1_000_000 $ do
dieOutOfCpuTime cpuLimit = go
where
go = do
cpuTime <- getCpuTime
when (cpuTime > cpuLimit) $ exit' exitOutOfCpuTime

loop ms f =
f >> threadDelay ms >> loop ms f

if cpuTime > cpuLimit
then pure outOfCpuTimeExitCode
else threadDelay 1_000_000 >> go


readWorkerInput :: (MonadIO m) => m WorkerInput
Expand Down Expand Up @@ -233,11 +222,13 @@ rtsMemValue mb = show mb <> "m"
defaultRts :: [String]
defaultRts = [ "-I0" ]

exitParentDied, exitTimeout, exitOutOfCpuTime, exitOutOfMemory, exitKillByTypedProcess :: ExitCode
exitParentDied = ExitFailure 11
exitTimeout = ExitFailure 12
exitOutOfCpuTime = ExitFailure 13
exitOutOfMemory = ExitFailure 251
parentDiedExitCode, timeoutExitCode, outOfCpuTimeExitCode, outOfMemoryExitCode :: ExitCode
exitKillByTypedProcess, exceptionExitCode :: ExitCode
exceptionExitCode = ExitFailure 99
parentDiedExitCode = ExitFailure 111
timeoutExitCode = ExitFailure 122
outOfCpuTimeExitCode = ExitFailure 113
outOfMemoryExitCode = ExitFailure 251
exitKillByTypedProcess = ExitFailure (-2)

worderIdS :: WorkerId -> String
Expand Down Expand Up @@ -295,17 +286,17 @@ runWorker logger config workerId params timeout cpuLimit extraCli = do
Right r ->
pure r
exit@(ExitFailure errorCode)
| exit == exitTimeout -> do
| exit == timeoutExitCode -> do
let message = [i|Worker #{workerId} execution timed out.|]
logError logger message
trace WorkerTimeoutTrace
appError $ InternalE $ WorkerTimeout message
| exit == exitOutOfCpuTime -> do
| exit == outOfCpuTimeExitCode -> do
let message = [i|Worker #{workerId} ran out of CPU time.|]
logError logger message
trace WorkerCpuOveruseTrace
appError $ InternalE $ WorkerOutOfCpuTime message
| exit == exitOutOfMemory -> do
| exit == outOfMemoryExitCode -> do
let message = [i|Worker #{workerId} ran out of memory.|]
logError logger message
appError $ InternalE $ WorkerOutOfMemory message
Expand Down
2 changes: 1 addition & 1 deletion stack.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
resolver: lts-22.23
resolver: lts-22.26

packages:
- .
Expand Down

0 comments on commit e5d1524

Please sign in to comment.