Wednesday, May 27, 2009

Lwt and Concurrent ML

Programming concurrent systems with threads and locks is famously, even fabulously, error-prone. With Lwt's cooperative threads you don't have to worry so much about protecting data structures against concurrent modification, since your code runs atomically between binds. Still, the standard concurrency primitives (mutexes, condition variables) are sometimes useful; but using them with Lwt is not much less painful than with preemptive threads. In this post I want to explore the combination of Lwt with the concurrency primitives of Concurrent ML. I hope to convince you that CML's primitives are easier to use, and a good match for Lwt.

Blocking queues in Lwt

I got started with Lwt when I was writing a work queue (as an Ocamlnet RPC service using orpc). The server keeps a queue of jobs, and workers poll for a task via RPC. An RPC request turns into an Lwt thread; all these threads share the queue. If there's no job in the queue, a request blocks until one is available. So I needed a blocking queue, with the following signature:

type 'a t
val create : unit -> 'a t
val add : 'a -> 'a t -> unit
val take : 'a t -> 'a Lwt.t
The queue is unbounded, so you can add without blocking, but a take may block. (It's nice how in Lwt the possibility of blocking is revealed in the type). Here's the implementation:
type 'a t = {
  m : Lwt_mutex.t;
  c : Lwt_condition.t;
  q : 'a Queue.t;

let create () = {
  m = Lwt_mutex.create ();
  c = Lwt_condition.create ();
  q = Queue.create ();
A queue is made up of a regular OCaml queue, a condition variable (signaled when there's something in the queue), and a mutex for use with the condition variable. (The Lwt_condition module is based on the Condition module of the standard OCaml threads library.)
let add e t =
  Queue.add e t.q;
  Lwt_condition.signal t.c

let take t =
  Lwt_mutex.lock t.m >>= fun () ->
  if Queue.is_empty t.q
  then Lwt_condition.wait t.c t.m
  else Lwt.return () >>= fun () ->
  let e = Lwt.return (Queue.take t.q) in
  Lwt_mutex.unlock t.m;
Since Lwt threads are cooperative we don't need to worry about concurrent access to the underlying queue. The role of the mutex here is only to ensure that when a thread blocked on the condition gets signaled, another thread can't take the element first.


What if there are no entries in the queue for a while? Within a single process, no big deal, the thread can keep waiting forever. That doesn't seem like a good idea over a network connection; we should time out at some point and return a response indicating that no task is available. Here is a first attempt at taking an element from the queue with a timeout:

Lwt.choose [
  Lwt_queue.take q;
  Lwt_unix.sleep timeout >>= fun () -> (Failure "timeout");
The Lwt.choose function "behaves as the first thread [...] to terminate". However, the other threads are still running after the first one terminates. It doesn't matter if the sleep is still running after the take completes, but if the sleep finishes first, then the take thread is still waiting to take an element from the queue. When an element becomes available, this thread takes it, and drops it on the floor (since the choose has already finished). And in general this sort of thing can happen whenever a thread you choose between has some effect; the effect still happens even if the thread is not chosen. A thread can block on only one condition at a time. In order to take an element with a timeout, we're forced to build timeouts into the queue, so we can get at the queue's condition variable. We add an optional argument to take:
val take : ?timeout:float -> 'a t -> 'a Lwt.t
and modify the implementation:
let take ?(timeout=(-1.)) t =
  let timed_out = ref false in
  if timeout >= 0.
      (Lwt_unix.sleep timeout >>= fun () ->
        timed_out := true;
        Lwt_condition.broadcast t.c;
        Lwt.return ());
  Lwt_mutex.lock t.m >>= fun () ->
    let rec while_empty () =
      if !timed_out then Lwt.return false
      else if not (Queue.is_empty t.q) then Lwt.return true
      else Lwt_condition.wait t.c t.m >>= while_empty in
    while_empty () >>= fun not_empty ->
    let e = if not_empty then Some (Queue.take t.q) else None in
    Lwt_mutex.unlock t.m;
    Lwt_condition.signal t.c;
    match e with Some e -> Lwt.return e | _ -> Timeout
In an auxilliary thread we wait for the timeout, then set a timeout flag for the main thread and broadcast the condition. It's important to use broadcast, which signals all waiting threads, instead of signal, which signals an arbitrary waiter, in order to be sure that we wake up the timed-out thread. But now it's possible for a thread to be signaled when neither the timeout has expired nor an element is available, so we must loop around waiting on the condition. And a signal from adding an element may be sent to a timed-out thread, so we need to signal another thread to avoid forgetting the added element. This is not very nice. First, the interface isn't modular. We've hard-coded a particular pair of events to wait for; what if we wanted to wait on two queues at once, or a queue and network socket? Second, the implementation is tricky to understand. We have to reason about how multiple threads, each potentially at a different point in the program, interact with the shared state.


Concurrent ML provides a different set of primitives. It makes the notion of an event--something that may happen in the future, like a timeout or a condition becoming true--into an explicit datatype, so you can return it from a function, store it in a data structure, and so on:

type 'a event
When an event occurs, it carries a value of type 'a. The act of synchronizing on (waiting for) an event is a separate function:
val sync : 'a event -> 'a Lwt.t
Of course it returns Lwt.t since it may block; the returned value is the value of the event occurrence. You can make an event that occurs when any of several events occurs, so a thread can wait on several events at once:
val choose : 'a event list -> 'a event
When one event occurs, the thread is no longer waiting on the other events (in contrast to Lwt.choose). Since synchronizing on a choice of events is a very common pattern, there's also
val select : 'a event list -> 'a Lwt.t
which is the same as sync of choose. Its meaning is very similar to block until one of the events occurs. A channel is sort of like a zero-length queue: both reader and writer must synchronize on the channel at the same time to pass a value from one to the other:
type 'a channel
val new_channel : unit -> 'a channel
val send : 'a channel -> 'a -> unit event
val receive : 'a channel -> 'a event
Both send and receive are blocking operations, so they return events. Finally, there's a way to map the value of an event when it occurs:
val wrap : 'a event -> ('a -> 'b Lwt.t) -> 'b event
The event wrap e f occurs when e occurs, with value f v (where v is the value returned by the occurrence of e). (Here's the full interface of Lwt_event. There are events for Unix file descriptor operations in Lwt_event_unix.)

Blocking queues with Lwt_event

Now I want to reimplement blocking queues using these new primitives:

type 'a t

val create : unit -> 'a t
val add : 'a -> 'a t -> unit Lwt.t
val take : 'a t -> 'a Lwt_event.event
The interface is similar. As before, take is a blocking operation, but it returns an event instead of Lwt.t so we can combine it with other events using choose. The new add returns Lwt.t, but this is an artifact: a thread calling add won't actually block (we'll see why below). For this reason, add doesn't need to return event.
type 'a t = {
  inch: 'a channel;
  ouch: 'a channel;
let add e t = sync (send t.inch e)
let take t = receive t.ouch
A queue consists of two channels, one for adding items into the queue and one for taking them out. The functions implementing the external interface just send and receive on these channels.
let create () =
  let q = Queue.create () in
  let inch = new_channel () in
  let ouch = new_channel () in
To create a queue, we make the channels and the underlying queue (we don't need to store it in the record; it will be hidden in a closure). We're going to have an internal thread to manage the queue; next we need some events for it to interact with the channels:
  let add =
    wrap (receive inch) (fun e ->
      Queue.add e q;
      Lwt.return ()) in

  let take () =
    wrap (send ouch (Queue.peek q)) (fun () ->
      ignore (Queue.take q);
      Lwt.return ()) in
Here add receives an element from the input channel and adds it to the underlying queue; and take sends the top element of the queue on the output channel. Keep in mind that these events don't occur (and the function passed to wrap is not executed) until there's actually a thread synchronizing on the complementary event on the channel. We call Queue.peek in take because at the point that we offer to send an element on a channel, we have to come up with the element; but we don't want to take it off the underlying queue, because there might never be a thread synchronizing on the complementary event on the channel. (Maybe there should be a version of send that takes a thunk?)
  let rec loop () =
    let evs =
      if Queue.is_empty q
      then [ add ]
      else [ add; take () ] in
    select evs >>= loop in
  ignore (loop ());

  { inch = inch; ouch = ouch }
Here's the internal thread. If the queue is empty all we can do is wait for an element to be added; if not, we wait for an element to be added or taken. Now we can see why the add function of the external queue interface can't block: we always select the add event, so as soon as another thread wants to send an element on the input channel, the internal thread is available to receive it.


Now, the punchline: we didn't build timeouts into the queue; still we can select between taking an element or timing out:

select [
  Lwt_event_queue.take q;
  wrap (Lwt_event_unix.sleep timeout)
    (fun () -> (Failure "timeout"));
Much better. Moreover, I think this queue implementation is easier to reason about (once you're comfortable with the CML primitives), even compared to our first version (without timeouts). The difference is that only the internal thread touches the state of the queue--in fact it's the only thread for which the state is even in scope! We don't need to worry conditions and signaling; we just offer an element on the output channel when one is available. This is only an inkling of the power of CML; the book Concurrent Programming in ML contains much more, including some large examples.

Why is this style of concurrency not more common? I think there are several reasons: First, idiomatic CML programming requires very lightweight threads (you don't want a native thread, or even an OCaml bytecode thread, for every queue). Second, the wrap combinator, essential for building complex events, requires higher-order functions, so there's no similarly concise translation into, say, Java. Finally, I think it's not widely appreciated that concurrent programming is useful without parallel programming. The mutex approach works fine for parallel programming, while CML has only recently been implemented in a parallel setting. None of these reasons applies to Lwt programming; Concurrent ML is a good fit with Lwt.

In an earlier post I asserted (without much to back it up) that Ocamlnet's Equeue gives better low-level control over blocking than Lwt. The Lwt_event and Lwt_event_unix modules provide a similar degree of control, with a higher-level interface.


  1. Hello,

    in the development branch of Lwt (which i hope we will release soon) i have added the ability to create "cancellable threads". So you can write something along the lines of: [Lwt_unix.sleep 1.0 >> fail Timeout;
    Lwt_queue.take q >>= f; input_channel >>= print]

    And it is ensured that only one of the three threads will complete (if they were originally sleeping), the others will fail with the exception Lwt.Canceled.

  2. Hi Jérémie. This is a step in the right direction, but why not just use the CML primitives? I would be glad if you included my Lwt_event* modules in Lwt.

  3. Hi Jake,

    I think that CML primitives, and in general synchronisation are too much preemptive specific. With cooperative threads there is always an easier way.

    Moreover adding a new system would means a lot of code duplication (read : ... -> int Lwt.t and read_event : ... -> int Lwt_event.t).

    1. Could you elaborate on why you believe that CML and synchronization primitives in general are "too much preemptive specific"?

      My experience is different. I have programmed with a few systems that provide cooperative threads and I've found it easy to use similar synchronization abstractions (ranging from atomic operations, mutexes and condition variables to CML style selective synchronous abstractions) with cooperative threads as with traditional threads. This is especially so when cooperative threads are combined with parallel execution and you can no longer assume that there is only one thread running.

      Combining lighweight cooperative threads (no preemption) and parallelism can be very effective for performance reasons as it can sometimes improve the asymptotic space complexity of parallel algorithms when compared to preemptive models.

  4. Interesting... can you expand on this "easier way"? How would you write a blocking queue? I admit I am no expert on cooperative threads.

    W.r.t. code duplication, I thought your proposal for cancelable tasks also duplicated the Unix file descriptor calls. I think both forms are useful.

    (If you would prefer to take this to email or the Lwt list that is fine with me.)

  5. Hi, Jake-

    Have you considered using JoCaml? If so, I would be happy hear your thoughts on JoCaml v. Lwt + CML. Thanks.

  6. Hi Jake,

    you might be interested in the Hopac library for F# that I recently open-sourced and that provides a very similar programming model, namely cooperative lightweight threads and CML style synchronous abstractions. In addition, it also runs those lightweight threads in parallel using a work distributing scheduler. I originally developed the library for a project I maintain at work, but decided to open-source the library as I thought it might interest others.