Programming concurrent systems with threads and locks is famously, even fabulously, error-prone. With Lwt's cooperative threads you don't have to worry so much about protecting data structures against concurrent modification, since your code runs atomically between bind
s. Still, the standard concurrency primitives (mutexes, condition variables) are sometimes useful; but using them with Lwt is not much less painful than with preemptive threads. In this post I want to explore the combination of Lwt with the concurrency primitives of Concurrent ML. I hope to convince you that CML's primitives are easier to use, and a good match for Lwt.
I got started with Lwt when I was writing a work queue (as an Ocamlnet RPC service using orpc). The server keeps a queue of jobs, and workers poll for a task via RPC. An RPC request turns into an Lwt thread; all these threads share the queue. If there's no job in the queue, a request blocks until one is available. So I needed a blocking queue, with the following signature:
type 'a t val create : unit -> 'a t val add : 'a -> 'a t -> unit val take : 'a t -> 'a Lwt.tThe queue is unbounded, so you can
add
without blocking, but a take
may block. (It's nice how in Lwt the possibility of blocking is revealed in the type). Here's the implementation: type 'a t = { m : Lwt_mutex.t; c : Lwt_condition.t; q : 'a Queue.t; } let create () = { m = Lwt_mutex.create (); c = Lwt_condition.create (); q = Queue.create (); }A queue is made up of a regular OCaml queue, a condition variable (signaled when there's something in the queue), and a mutex for use with the condition variable. (The
Lwt_condition
module is based on the Condition
module of the standard OCaml threads library.) let add e t = Queue.add e t.q; Lwt_condition.signal t.c let take t = Lwt_mutex.lock t.m >>= fun () -> if Queue.is_empty t.q then Lwt_condition.wait t.c t.m else Lwt.return () >>= fun () -> let e = Lwt.return (Queue.take t.q) in Lwt_mutex.unlock t.m; eSince Lwt threads are cooperative we don't need to worry about concurrent access to the underlying queue. The role of the mutex here is only to ensure that when a thread blocked on the condition gets signaled, another thread can't take the element first.
Timeouts?
What if there are no entries in the queue for a while? Within a single process, no big deal, the thread can keep waiting forever. That doesn't seem like a good idea over a network connection; we should time out at some point and return a response indicating that no task is available. Here is a first attempt at taking an element from the queue with a timeout:
Lwt.choose [ Lwt_queue.take q; Lwt_unix.sleep timeout >>= fun () -> Lwt.fail (Failure "timeout"); ]The
Lwt.choose
function "behaves as the first thread [...] to terminate". However, the other threads are still running after the first one terminates. It doesn't matter if the sleep
is still running after the take
completes, but if the sleep
finishes first, then the take
thread is still waiting to take an element from the queue. When an element becomes available, this thread takes it, and drops it on the floor (since the choose
has already finished). And in general this sort of thing can happen whenever a thread you choose
between has some effect; the effect still happens even if the thread is not chosen. A thread can block on only one condition at a time. In order to take
an element with a timeout, we're forced to build timeouts into the queue, so we can get at the queue's condition variable. We add an optional argument to take
: val take : ?timeout:float -> 'a t -> 'a Lwt.tand modify the implementation:
let take ?(timeout=(-1.)) t = let timed_out = ref false in if timeout >= 0. then Lwt.ignore_result (Lwt_unix.sleep timeout >>= fun () -> timed_out := true; Lwt_condition.broadcast t.c; Lwt.return ()); Lwt_mutex.lock t.m >>= fun () -> let rec while_empty () = if !timed_out then Lwt.return false else if not (Queue.is_empty t.q) then Lwt.return true else Lwt_condition.wait t.c t.m >>= while_empty in while_empty () >>= fun not_empty -> let e = if not_empty then Some (Queue.take t.q) else None in Lwt_mutex.unlock t.m; Lwt_condition.signal t.c; match e with Some e -> Lwt.return e | _ -> Lwt.fail TimeoutIn an auxilliary thread we wait for the timeout, then set a timeout flag for the main thread and broadcast the condition. It's important to use
broadcast
, which signals all waiting threads, instead of signal
, which signals an arbitrary waiter, in order to be sure that we wake up the timed-out thread. But now it's possible for a thread to be signaled when neither the timeout has expired nor an element is available, so we must loop around waiting on the condition. And a signal
from adding an element may be sent to a timed-out thread, so we need to signal
another thread to avoid forgetting the added element. This is not very nice. First, the interface isn't modular. We've hard-coded a particular pair of events to wait for; what if we wanted to wait on two queues at once, or a queue and network socket? Second, the implementation is tricky to understand. We have to reason about how multiple threads, each potentially at a different point in the program, interact with the shared state.Lwt_event
Concurrent ML provides a different set of primitives. It makes the notion of an event--something that may happen in the future, like a timeout or a condition becoming true--into an explicit datatype, so you can return it from a function, store it in a data structure, and so on:
type 'a eventWhen an event occurs, it carries a value of type
'a
. The act of synchronizing on (waiting for) an event is a separate function: val sync : 'a event -> 'a Lwt.tOf course it returns
Lwt.t
since it may block; the returned value is the value of the event occurrence. You can make an event that occurs when any of several events occurs, so a thread can wait on several events at once: val choose : 'a event list -> 'a eventWhen one event occurs, the thread is no longer waiting on the other events (in contrast to
Lwt.choose
). Since synchronizing on a choice of events is a very common pattern, there's also val select : 'a event list -> 'a Lwt.twhich is the same as
sync
of choose
. Its meaning is very similar to Unix.select
: block until one of the events occurs. A channel is sort of like a zero-length queue: both reader and writer must synchronize on the channel at the same time to pass a value from one to the other: type 'a channel val new_channel : unit -> 'a channel val send : 'a channel -> 'a -> unit event val receive : 'a channel -> 'a eventBoth
send
and receive
are blocking operations, so they return event
s. Finally, there's a way to map the value of an event when it occurs: val wrap : 'a event -> ('a -> 'b Lwt.t) -> 'b eventThe event
wrap e f
occurs when e
occurs, with value f v
(where v
is the value returned by the occurrence of e
). (Here's the full interface of Lwt_event
. There are events for Unix file descriptor operations in Lwt_event_unix
.)Blocking queues with Lwt_event
Now I want to reimplement blocking queues using these new primitives:
type 'a t val create : unit -> 'a t val add : 'a -> 'a t -> unit Lwt.t val take : 'a t -> 'a Lwt_event.eventThe interface is similar. As before,
take
is a blocking operation, but it returns an event
instead of Lwt.t
so we can combine it with other events using choose
. The new add
returns Lwt.t
, but this is an artifact: a thread calling add
won't actually block (we'll see why below). For this reason, add
doesn't need to return event
. type 'a t = { inch: 'a channel; ouch: 'a channel; } let add e t = sync (send t.inch e) let take t = receive t.ouchA queue consists of two channels, one for adding items into the queue and one for taking them out. The functions implementing the external interface just send and receive on these channels.
let create () = let q = Queue.create () in let inch = new_channel () in let ouch = new_channel () inTo
create
a queue, we make the channels and the underlying queue (we don't need to store it in the record; it will be hidden in a closure). We're going to have an internal thread to manage the queue; next we need some events for it to interact with the channels:let add = wrap (receive inch) (fun e -> Queue.add e q; Lwt.return ()) in let take () = wrap (send ouch (Queue.peek q)) (fun () -> ignore (Queue.take q); Lwt.return ()) inHere
add
receives an element from the input channel and adds it to the underlying queue; and take
sends the top element of the queue on the output channel. Keep in mind that these events don't occur (and the function passed to wrap
is not executed) until there's actually a thread synchronizing on the complementary event on the channel. We call Queue.peek
in take
because at the point that we offer to send an element on a channel, we have to come up with the element; but we don't want to take it off the underlying queue, because there might never be a thread synchronizing on the complementary event on the channel. (Maybe there should be a version of send
that takes a thunk?)let rec loop () = let evs = if Queue.is_empty q then [ add ] else [ add; take () ] in select evs >>= loop in ignore (loop ()); { inch = inch; ouch = ouch }Here's the internal thread. If the queue is empty all we can do is wait for an element to be added; if not, we wait for an element to be added or taken. Now we can see why the
add
function of the external queue interface can't block: we always select
the add
event, so as soon as another thread wants to send an element on the input channel, the internal thread is available to receive it.Timeouts!
Now, the punchline: we didn't build timeouts into the queue; still we can select between taking an element or timing out:
select [ Lwt_event_queue.take q; wrap (Lwt_event_unix.sleep timeout) (fun () -> Lwt.fail (Failure "timeout")); ]Much better. Moreover, I think this queue implementation is easier to reason about (once you're comfortable with the CML primitives), even compared to our first version (without timeouts). The difference is that only the internal thread touches the state of the queue--in fact it's the only thread for which the state is even in scope! We don't need to worry conditions and signaling; we just offer an element on the output channel when one is available. This is only an inkling of the power of CML; the book Concurrent Programming in ML contains much more, including some large examples.
Why is this style of concurrency not more common? I think there are several reasons: First, idiomatic CML programming requires very lightweight threads (you don't want a native thread, or even an OCaml bytecode thread, for every queue). Second, the wrap
combinator, essential for building complex events, requires higher-order functions, so there's no similarly concise translation into, say, Java. Finally, I think it's not widely appreciated that concurrent programming is useful without parallel programming. The mutex approach works fine for parallel programming, while CML has only recently been implemented in a parallel setting. None of these reasons applies to Lwt programming; Concurrent ML is a good fit with Lwt.
In an earlier post I asserted (without much to back it up) that Ocamlnet's Equeue gives better low-level control over blocking than Lwt. The Lwt_event
and Lwt_event_unix
modules provide a similar degree of control, with a higher-level interface.