Skip to content

Commit

Permalink
Use a real-time queue for TQueue
Browse files Browse the repository at this point in the history
This is another alternative design for `TQueue`. Instead of an
amortized queue, this uses a real-time one based on Okasaki's
scheduled banker's queues. We limit contention by using two
independent schedules.
  • Loading branch information
treeowl committed May 24, 2018
1 parent 92af455 commit 0db4321
Showing 1 changed file with 102 additions and 65 deletions.
167 changes: 102 additions & 65 deletions Control/Concurrent/STM/TQueue.hs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{-# OPTIONS_GHC -fno-warn-name-shadowing #-}
{-# LANGUAGE CPP, DeriveDataTypeable #-}
{-# LANGUAGE BangPatterns #-}

#if __GLASGOW_HASKELL__ >= 701
{-# LANGUAGE Trustworthy #-}
Expand All @@ -17,16 +18,14 @@
--
-- A 'TQueue' is like a 'TChan', with two important differences:
--
-- * it has faster throughput than both 'TChan' and 'Chan' (although
-- the costs are amortised, so the cost of individual operations
-- can vary a lot).
-- * it has faster throughput than both 'TChan' and 'Chan'
--
-- * it does /not/ provide equivalents of the 'dupTChan' and
-- 'cloneTChan' operations.
--
-- The implementation is based on the traditional purely-functional
-- queue representation that uses two lists to obtain amortised /O(1)/
-- enqueue and dequeue operations.
-- The implementation is based on Okasaki's scheduled banker's queues,
-- but it uses *two* schedules so there's only contention between the
-- reader and writer when the queue needs to be rotated.
--
-- @since 2.4
-----------------------------------------------------------------------------
Expand All @@ -44,63 +43,107 @@ module Control.Concurrent.STM.TQueue (
writeTQueue,
unGetTQueue,
isEmptyTQueue,
) where
) where

import GHC.Conc
import Control.Monad (unless)
import Data.Typeable (Typeable)

data End a =
End [a] -- list
[a] -- schedule

-- | 'TQueue' is an abstract type representing an unbounded FIFO channel.
--
-- @since 2.4
data TQueue a = TQueue {-# UNPACK #-} !(TVar [a])
{-# UNPACK #-} !(TVar [a])
data TQueue a = TQueue {-# UNPACK #-} !(TVar (End a))
{-# UNPACK #-} !(TVar (End a))
deriving Typeable
{-
Invariant:
Given front list, rear list, front schedule, and rear schedule called
front, rear, fsched, and rsched, respectively,
2 * (|front| - |rear|) = |fsched| + |rsched|
Note that because lengths cannot be negative, this implies that
|front| >= |rear|
We rotate the queue when either schedule is empty. This preserves
the invariant and ensures that the spine of the front list is
fully realized when a rotation occurs.
-}

instance Eq (TQueue a) where
TQueue a _ == TQueue b _ = a == b

-- |Build and returns a new instance of 'TQueue'
-- | Build and returns a new instance of 'TQueue'
newTQueue :: STM (TQueue a)
newTQueue = do
read <- newTVar []
write <- newTVar []
read <- newTVar (End [] [])
write <- newTVar (End [] [])
return (TQueue read write)

-- |@IO@ version of 'newTQueue'. This is useful for creating top-level
-- | @IO@ version of 'newTQueue'. This is useful for creating top-level
-- 'TQueue's using 'System.IO.Unsafe.unsafePerformIO', because using
-- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't
-- possible.
newTQueueIO :: IO (TQueue a)
newTQueueIO = do
read <- newTVarIO []
write <- newTVarIO []
read <- newTVarIO (End [] [])
write <- newTVarIO (End [] [])
return (TQueue read write)

-- |Write a value to a 'TQueue'.
-- rotate front end = front ++ reverse rear, but the reverse is performed
-- incrementally as the append proceeds.
--
-- Precondition: |front| + 1 >= |rear|. This ensures that when the front
-- list is empty, the rear list has at most one element, so we don't need
-- to reverse it.
rotate :: [a] -> [a] -> [a]
rotate = go []
where
go acc [] rear = rear ++ acc
go acc (x:xs) (r:rs)
= x : go (r:acc) xs rs
go acc xs [] = xs ++ acc

-- | Write a value to a 'TQueue'.
writeTQueue :: TQueue a -> a -> STM ()
writeTQueue (TQueue _read write) a = do
listend <- readTVar write
writeTVar write (a:listend)

-- |Read the next value from the 'TQueue'.
writeTQueue (TQueue read write) a = do
End listend rsched <- readTVar write
let listend' = a : listend
case rsched of
-- Reduce |front|-|rear| by 1; reduce |fsched|+|rsched| by 2
_:_:rsched' -> writeTVar write (End listend' rsched')

-- Rotate the queue; the invariant holds trivially.
_ -> do
End listfront _fsched <- readTVar read
let !front' = rotate listfront listend'
writeTVar read (End front' front')
writeTVar write (End [] front')

-- | Read the next value from the 'TQueue'.
readTQueue :: TQueue a -> STM a
readTQueue (TQueue read write) = do
xs <- readTVar read
case xs of
(x:xs') -> do
writeTVar read xs'
return x
[] -> do
ys <- readTVar write
case ys of
[] -> retry
_ -> do
let (z:zs) = reverse ys -- NB. lazy: we want the transaction to be
-- short, otherwise it will conflict
writeTVar write []
writeTVar read zs
return z
End listfront fsched <- readTVar read
case listfront of
[] -> retry
x:front' ->
case fsched of
-- Reduce |front|-|rear| by 1; reduce |fsched|+|rsched| by 2
_:_:fsched' -> writeTVar read (End front' fsched') >> return x

-- Rotate the queue; the invariant holds trivially.
_ -> do
End listend _rsched <- readTVar write
let !front'' = rotate front' listend
writeTVar read (End front'' front'')
writeTVar write (End [] front'')
return x

-- | A version of 'readTQueue' which does not retry. Instead it
-- returns @Nothing@ if no value is available.
Expand All @@ -113,44 +156,38 @@ tryReadTQueue c = fmap Just (readTQueue c) `orElse` return Nothing
-- @since 2.4.5
flushTQueue :: TQueue a -> STM [a]
flushTQueue (TQueue read write) = do
xs <- readTVar read
ys <- readTVar write
unless (null xs) $ writeTVar read []
unless (null ys) $ writeTVar write []
return (xs ++ reverse ys)
End front fsched <- readTVar read
End rear rsched <- readTVar write
unless (null front && null fsched) $ writeTVar read (End [] [])
unless (null rear && null rsched) $ writeTVar write (End [] [])
return (rotate front rear)

-- | Get the next value from the @TQueue@ without removing it,
-- retrying if the channel is empty.
peekTQueue :: TQueue a -> STM a
peekTQueue c = do
x <- readTQueue c
unGetTQueue c x
return x
peekTQueue (TQueue read _write) = do
End front _fsched <- readTVar read
case front of
x:_ -> return x
[] -> retry

-- | A version of 'peekTQueue' which does not retry. Instead it
-- returns @Nothing@ if no value is available.
tryPeekTQueue :: TQueue a -> STM (Maybe a)
tryPeekTQueue c = do
m <- tryReadTQueue c
case m of
Nothing -> return Nothing
Just x -> do
unGetTQueue c x
return m

-- |Put a data item back onto a channel, where it will be the next item read.
tryPeekTQueue (TQueue read _write) = do
End front _fsched <- readTVar read
case front of
x:_ -> return (Just x)
[] -> return Nothing

-- | Put a data item back onto a channel, where it will be the next item read.
unGetTQueue :: TQueue a -> a -> STM ()
unGetTQueue (TQueue read _write) a = do
xs <- readTVar read
writeTVar read (a:xs)
End front fsched <- readTVar read
writeTVar read (End (a:front) (a:a:fsched))

-- |Returns 'True' if the supplied 'TQueue' is empty.
-- | Returns 'True' if the supplied 'TQueue' is empty.
isEmptyTQueue :: TQueue a -> STM Bool
isEmptyTQueue (TQueue read write) = do
xs <- readTVar read
case xs of
(_:_) -> return False
[] -> do ys <- readTVar write
case ys of
[] -> return True
_ -> return False
isEmptyTQueue (TQueue read _write) = do
End front _fsched <- readTVar read
return $! null front

0 comments on commit 0db4321

Please sign in to comment.