2017-06-25 Threading responsibly

Alternative clickbait title: forkIO considered harmful.

This is going to be a brief post elaborating on various comments made at ZuriHac that surprised some people. I promised some writing, so here it is!

Managing resources

Resource management is an important subject in many languages and Haskell is no exception. With “resource management” I’m referring to handling all those objects that require some form of release.

Examples include:

Resource Acquire Release
Memory malloc free
Handles open close
Locks/MVars takeMVar putMVar

and surely many others I can’t think of now.

There are many ways to perform resource management. Modern C++ and Rust have quite strong built-in support for timely resource management via RAII. In Haskell we use a combination of garbage collection and finalizers when timely release is not crucial and bracket when it is.

Threads are resources

In this post I want to argue that threads are also a resource, in the sense that they require management after we’ve created them. A stray thread will consume memory, CPU cycles, and really whatever resource it might need to execute. Moreover, we most likely want to know if the thread failed. Thus, creating a thread should always be paired with some code that checks on it and tears it down if necessary.

However, this is not standard practice in most Haskell codebases I see, even written by experienced Haskellers, and I’d like that to change. One common objection when I bring this up is that people often use forkIO with threads that should run for the entire execution of the program, and thus we don’t really need to manage them. However this is bad practice and leads to code which is not composable: if I want to safely reuse code written in this way I can’t, since it leaks threads left and right. Note that the same objection would apply to not always releasing handles or other resources but it’s widely accepted that it’s best practice to always release them properly, no matter their expected lifetime.

Safely “releasing” threads

Now that we hopefully agree that not managing threads is a bad idea, how should threads be managed? The “acquire” operation for threads, in Haskell, is

forkIO :: IO a -> IO ThreadId

but what shall “release” be? A first candidate might be

cancelThread :: ThreadId -> IO ()
cancelThread tid = throwTo tid ThreadKilled

This is a good attempt, and in fact to the best of my knowledge was all we had until recently.

Sadly cancelThread is problematic since it does not guarantee that the thread has been shut down when it returns – only that the exception has been delivered. However the recipient of the exception could still be running after the exception has been delivered – most notably it could be running finalizers in bracket! This subtlety has caused me and Philip Kant countless hours of entertainment in a real system.

forkIO bad, async good

The solution is to not use forkIO and to always use async, or lifted-async. What async does is set up infrastructure to know when a thread has terminated, thus allowing to write a cancel that waits for the thread to be done before returning. async also gives us an easy and safe way to get the result of the forked computation and to propagate exceptions upstream. In short, async should be your default way to perform concurrency, it’s really an amazing tool and yet another case of Haskell exceptionalism.

Then, we can update our table, with cancel, which guarantees that the thread is dead by the time it returns:1

Resource Acquire Release
Memory malloc free
Handles open close
Locks/MVars takeMVar putMVar
Threads async cancel

async also provides a plethora of useful combinators that should be in the standard toolkit of every Haskell programmers, such as race, concurrently, the Concurrently Alternative Applicative, and many others.

Some async patterns

Finally, I want to show some patterns that are common and that will hopefully give a bit of an idea of how async can be used to easily implement complex tasks. If you’re already convinced and have something better to do you can stop reading, but if you want to see some more complex examples using async keep reading.

Note that all the examples below are written using IO, but they would work equally well with MonadBaseUnlift m.

Workers

When writing backend services one of the primitives I use most is withWorker or some variation of it:

-- | Runs a worker action alongside the provided continuation.
-- The worker will be automatically torn down when the continuation
-- terminates.
withWorker ::
     IO Void -- ^ Worker to run
  -> IO a
  -> IO a
withWorker worker cont = either absurd id <$> race worker cont

Where race is a wonderfully useful function from async, which runs two threads and returns as soon as one of them finishes, canceling the other:

race :: IO a -> IO b -> IO (Either a b)

withWorker takes the “worker” action, which is an endless loop as indicated by the type, and a continuation, and runs them side-by-side.

As soon as the continuation terminates the worker will be automatically tore down as well by race, and the result of the continuation returned upstream.

pooledMapConcurrently

A useful combinator from async is mapConcurrently, which is much like traverse but executing each action in parallel:

mapConcurrently :: Traversable t => (a -> IO b) -> t a -> IO (t b)

While this is often good enough if the provided action is IO-bound, it’s harmful if the action is CPU-bound: if we have a list of 1000 elements and need to perform some lengthy number crunching on each of them, we most likely want do not want to execute 1000 number crunching threads in parallel, but rather 1 number crunching thread per core.

To solve this other common use case me and Patrick Chilton wrote pooledMapConcurrently:

-- | Like 'mapConcurrently' from async, but instead of
-- one thread per element, only use one thread per capability.
pooledMapConcurrently :: forall t a b.
     (Traversable t)
  => (a -> IO b) -> t a -> IO (t b)
pooledMapConcurrently f xs = do
  numProcs <- getNumCapabilities
  -- prepare one IORef per result...
  jobs :: t (a, IORef b) <-
    for xs (\x -> (x, ) <$> newIORef (error "pooledMapConcurrently: empty IORef"))
  -- ...put all the inputs in a queue..
  jobsVar :: MVar [(a, IORef b)] <- newMVar (toList jobs)
  -- ...run `numProcs` threads in parallel, each
  -- of them consuming the queue and filling in
  -- the respective IORefs.
  forConcurrently_ $ [1..numProcs] $ \_ -> do
    let loop = do
          mbJob :: Maybe (a, IORef b) <- modifyMVar jobsVar $ \case
            [] -> return ([], Nothing)
            var : vars -> return (vars, Just var)
          case mbJob of
            Nothing -> return ()
            Just (x, outRef) -> do
              y <- f x
              writeIORef outRef y
              loop
    loop
  -- read all the IORefs once we're done.
  for jobs (\(_, outputRef) -> readIORef outputRef)

Where forConcurrently_ is a slightly rearranged mapConcurrently:

forConcurrently_ :: Foldable f => f a -> (a -> IO b) -> IO ()

What I like about this function is how short it is, considering how much it’s doing. We are traversing generically using Traversable, implementing a queue using MVar, and coordinating threads threads using async. The result will be fast, automatically handling differences in the execution times of different invocations of the provided function. It will also be exception safe: if any of the function invocation fails, or if an asynchronous exception is received, the whole thread hierarchy will be gracefuly shut down. Writing such a function in other languages is almost impossible, but in Haskell knowing the right tools it takes half an hour.

Edit: quchen on reddit suggested using semaphores – and using that the function is even shorter:

    pooledMapConcurrently ::
         (Traversable t)
      => (a -> IO b) -> t a -> IO (t b)
    pooledMapConcurrently f xs = do
      numProcs <- getNumCapabilities
      sem <- newQSem numProcs
      forConcurrently xs $ \x -> do
        bracket
          (waitQSem sem)
          (\() -> signalQSem sem)
          (\() -> f x)

One advantage the original version executes left to right – elements appearing later will always start processing after earlier elements have already started, which can be more amenable to debugging (for example if you’re looking at the output in a terminal, or reproducing a crash). It also avoids high contention (one thread per element) on the synchronization mechanism. However the semaphore version is certainly more pleasant to read and probably preferrable.

Pooled map and streaming

Finally, as a last example I have a variation of the concurrent pooled map – conduitPooledMapMBuffered. This function, due to Niklas Hambüchen, lets us easily write Conduits that compute their outputs in parallel, using one thread per capability, while preserving the same output order that a non-parallel conduit would produce. This is extremely useful in situations where we want to have some CPU bound worker that is capable of streaming inputs and outputs while efficiently utilizing the available cores.

It’s an interesting example because there are many moving pieces – concurrency, streaming, resource management through ResourceT – and everything is tied together nicely to guarantee good behavior. If you want an example of complex logic written with async, check it out!


  1. Note that you’ll need a relatively recent version of async to get the correct behavior, specifically one published after we fixed some problematic behavior in async, see https://github.com/simonmar/async/pull/42 and https://github.com/simonmar/async/pull/44.