Wednesday, May 27, 2009

Lwt and Concurrent ML

Programming concurrent systems with threads and locks is famously, even fabulously, error-prone. With Lwt's cooperative threads you don't have to worry so much about protecting data structures against concurrent modification, since your code runs atomically between binds. Still, the standard concurrency primitives (mutexes, condition variables) are sometimes useful; but using them with Lwt is not much less painful than with preemptive threads. In this post I want to explore the combination of Lwt with the concurrency primitives of Concurrent ML. I hope to convince you that CML's primitives are easier to use, and a good match for Lwt.

Blocking queues in Lwt

I got started with Lwt when I was writing a work queue (as an Ocamlnet RPC service using orpc). The server keeps a queue of jobs, and workers poll for a task via RPC. An RPC request turns into an Lwt thread; all these threads share the queue. If there's no job in the queue, a request blocks until one is available. So I needed a blocking queue, with the following signature:

type 'a t
val create : unit -> 'a t
val add : 'a -> 'a t -> unit
val take : 'a t -> 'a Lwt.t
The queue is unbounded, so you can add without blocking, but a take may block. (It's nice how in Lwt the possibility of blocking is revealed in the type). Here's the implementation:
type 'a t = {
  m : Lwt_mutex.t;
  c : Lwt_condition.t;
  q : 'a Queue.t;
}

let create () = {
  m = Lwt_mutex.create ();
  c = Lwt_condition.create ();
  q = Queue.create ();
}
A queue is made up of a regular OCaml queue, a condition variable (signaled when there's something in the queue), and a mutex for use with the condition variable. (The Lwt_condition module is based on the Condition module of the standard OCaml threads library.)
let add e t =
  Queue.add e t.q;
  Lwt_condition.signal t.c

let take t =
  Lwt_mutex.lock t.m >>= fun () ->
  if Queue.is_empty t.q
  then Lwt_condition.wait t.c t.m
  else Lwt.return () >>= fun () ->
  let e = Lwt.return (Queue.take t.q) in
  Lwt_mutex.unlock t.m;
  e
Since Lwt threads are cooperative we don't need to worry about concurrent access to the underlying queue. The role of the mutex here is only to ensure that when a thread blocked on the condition gets signaled, another thread can't take the element first.

Timeouts?

What if there are no entries in the queue for a while? Within a single process, no big deal, the thread can keep waiting forever. That doesn't seem like a good idea over a network connection; we should time out at some point and return a response indicating that no task is available. Here is a first attempt at taking an element from the queue with a timeout:

Lwt.choose [
  Lwt_queue.take q;
  Lwt_unix.sleep timeout >>= fun () ->
    Lwt.fail (Failure "timeout");
]
The Lwt.choose function "behaves as the first thread [...] to terminate". However, the other threads are still running after the first one terminates. It doesn't matter if the sleep is still running after the take completes, but if the sleep finishes first, then the take thread is still waiting to take an element from the queue. When an element becomes available, this thread takes it, and drops it on the floor (since the choose has already finished). And in general this sort of thing can happen whenever a thread you choose between has some effect; the effect still happens even if the thread is not chosen. A thread can block on only one condition at a time. In order to take an element with a timeout, we're forced to build timeouts into the queue, so we can get at the queue's condition variable. We add an optional argument to take:
val take : ?timeout:float -> 'a t -> 'a Lwt.t
and modify the implementation:
let take ?(timeout=(-1.)) t =
  let timed_out = ref false in
  if timeout >= 0.
  then
    Lwt.ignore_result
      (Lwt_unix.sleep timeout >>= fun () ->
        timed_out := true;
        Lwt_condition.broadcast t.c;
        Lwt.return ());
  Lwt_mutex.lock t.m >>= fun () ->
    let rec while_empty () =
      if !timed_out then Lwt.return false
      else if not (Queue.is_empty t.q) then Lwt.return true
      else Lwt_condition.wait t.c t.m >>= while_empty in
    while_empty () >>= fun not_empty ->
    let e = if not_empty then Some (Queue.take t.q) else None in
    Lwt_mutex.unlock t.m;
    Lwt_condition.signal t.c;
    match e with Some e -> Lwt.return e | _ -> Lwt.fail Timeout
In an auxilliary thread we wait for the timeout, then set a timeout flag for the main thread and broadcast the condition. It's important to use broadcast, which signals all waiting threads, instead of signal, which signals an arbitrary waiter, in order to be sure that we wake up the timed-out thread. But now it's possible for a thread to be signaled when neither the timeout has expired nor an element is available, so we must loop around waiting on the condition. And a signal from adding an element may be sent to a timed-out thread, so we need to signal another thread to avoid forgetting the added element. This is not very nice. First, the interface isn't modular. We've hard-coded a particular pair of events to wait for; what if we wanted to wait on two queues at once, or a queue and network socket? Second, the implementation is tricky to understand. We have to reason about how multiple threads, each potentially at a different point in the program, interact with the shared state.

Lwt_event

Concurrent ML provides a different set of primitives. It makes the notion of an event--something that may happen in the future, like a timeout or a condition becoming true--into an explicit datatype, so you can return it from a function, store it in a data structure, and so on:

type 'a event
When an event occurs, it carries a value of type 'a. The act of synchronizing on (waiting for) an event is a separate function:
val sync : 'a event -> 'a Lwt.t
Of course it returns Lwt.t since it may block; the returned value is the value of the event occurrence. You can make an event that occurs when any of several events occurs, so a thread can wait on several events at once:
val choose : 'a event list -> 'a event
When one event occurs, the thread is no longer waiting on the other events (in contrast to Lwt.choose). Since synchronizing on a choice of events is a very common pattern, there's also
val select : 'a event list -> 'a Lwt.t
which is the same as sync of choose. Its meaning is very similar to Unix.select: block until one of the events occurs. A channel is sort of like a zero-length queue: both reader and writer must synchronize on the channel at the same time to pass a value from one to the other:
type 'a channel
val new_channel : unit -> 'a channel
val send : 'a channel -> 'a -> unit event
val receive : 'a channel -> 'a event
Both send and receive are blocking operations, so they return events. Finally, there's a way to map the value of an event when it occurs:
val wrap : 'a event -> ('a -> 'b Lwt.t) -> 'b event
The event wrap e f occurs when e occurs, with value f v (where v is the value returned by the occurrence of e). (Here's the full interface of Lwt_event. There are events for Unix file descriptor operations in Lwt_event_unix.)

Blocking queues with Lwt_event

Now I want to reimplement blocking queues using these new primitives:

type 'a t

val create : unit -> 'a t
val add : 'a -> 'a t -> unit Lwt.t
val take : 'a t -> 'a Lwt_event.event
The interface is similar. As before, take is a blocking operation, but it returns an event instead of Lwt.t so we can combine it with other events using choose. The new add returns Lwt.t, but this is an artifact: a thread calling add won't actually block (we'll see why below). For this reason, add doesn't need to return event.
type 'a t = {
  inch: 'a channel;
  ouch: 'a channel;
}
let add e t = sync (send t.inch e)
let take t = receive t.ouch
A queue consists of two channels, one for adding items into the queue and one for taking them out. The functions implementing the external interface just send and receive on these channels.
let create () =
  let q = Queue.create () in
  let inch = new_channel () in
  let ouch = new_channel () in
To create a queue, we make the channels and the underlying queue (we don't need to store it in the record; it will be hidden in a closure). We're going to have an internal thread to manage the queue; next we need some events for it to interact with the channels:
  let add =
    wrap (receive inch) (fun e ->
      Queue.add e q;
      Lwt.return ()) in

  let take () =
    wrap (send ouch (Queue.peek q)) (fun () ->
      ignore (Queue.take q);
      Lwt.return ()) in
Here add receives an element from the input channel and adds it to the underlying queue; and take sends the top element of the queue on the output channel. Keep in mind that these events don't occur (and the function passed to wrap is not executed) until there's actually a thread synchronizing on the complementary event on the channel. We call Queue.peek in take because at the point that we offer to send an element on a channel, we have to come up with the element; but we don't want to take it off the underlying queue, because there might never be a thread synchronizing on the complementary event on the channel. (Maybe there should be a version of send that takes a thunk?)
  let rec loop () =
    let evs =
      if Queue.is_empty q
      then [ add ]
      else [ add; take () ] in
    select evs >>= loop in
  ignore (loop ());

  { inch = inch; ouch = ouch }
Here's the internal thread. If the queue is empty all we can do is wait for an element to be added; if not, we wait for an element to be added or taken. Now we can see why the add function of the external queue interface can't block: we always select the add event, so as soon as another thread wants to send an element on the input channel, the internal thread is available to receive it.

Timeouts!

Now, the punchline: we didn't build timeouts into the queue; still we can select between taking an element or timing out:

select [
  Lwt_event_queue.take q;
  wrap (Lwt_event_unix.sleep timeout)
    (fun () -> Lwt.fail (Failure "timeout"));
]
Much better. Moreover, I think this queue implementation is easier to reason about (once you're comfortable with the CML primitives), even compared to our first version (without timeouts). The difference is that only the internal thread touches the state of the queue--in fact it's the only thread for which the state is even in scope! We don't need to worry conditions and signaling; we just offer an element on the output channel when one is available. This is only an inkling of the power of CML; the book Concurrent Programming in ML contains much more, including some large examples.

Why is this style of concurrency not more common? I think there are several reasons: First, idiomatic CML programming requires very lightweight threads (you don't want a native thread, or even an OCaml bytecode thread, for every queue). Second, the wrap combinator, essential for building complex events, requires higher-order functions, so there's no similarly concise translation into, say, Java. Finally, I think it's not widely appreciated that concurrent programming is useful without parallel programming. The mutex approach works fine for parallel programming, while CML has only recently been implemented in a parallel setting. None of these reasons applies to Lwt programming; Concurrent ML is a good fit with Lwt.

In an earlier post I asserted (without much to back it up) that Ocamlnet's Equeue gives better low-level control over blocking than Lwt. The Lwt_event and Lwt_event_unix modules provide a similar degree of control, with a higher-level interface.

Monday, May 11, 2009

Sudoku in ocamljs, part 3: functional reactive programming

In part 1 and part 2 of this series, we made a simple Sudoku game and connected it to a game server. In this final installment I want to revisit how we check that a board satisfies the Sudoku rules. There's a small change to the UI: instead of a "Check" button, the board is checked continuously as the player enters numbers; any conflicts are highlighted as before. Here's the final result.

Let's review how we want checking to work: a cell is colored red if any other cell in the same row, column, or square (outlined in bold) contains the same number; otherwise the cell is colored white. Now take another look at the check_board function from part 1. Is it obvious that this code meets the specification? The function is essentially stateful, clearing all the cell colors then setting them red when it discovers a conflict. In fact, I had a bug in it related to state--I was clearing the background color in the None arm of check_set, so each checked constraint would overwrite the highlighting of the previous ones where they overlapped.

It would be easier to convince ourselves that we'd gotten it right if the code looked more like the specification. What we want is a function that maps each cell and its "adjacent" cells (the ones in the same row, column, or square) to a boolean (true if the cell is highlighted). Abstracting from the DOM details, suppose a cell is an int option and we have a function adjacents i j that returns a list of cells adjacent to the cell at (i, j). Then the check function is just:

let highlighted cell i j =
  cell <> None && List.mem cell (adjacents i j)

So how do we hook this function into the UI? We could just call it for every cell, every time we get a change event for some cell. That seems like a lot of needless computation, since almost all the cells haven't changed. On the other hand, if we manually keep track of which cells might be affected by a change, our code is no longer obviously correct. It would be nice to have some kind of incremental update, like a spreadsheet.

This is where functional reactive programming comes in. The main idea is to write functions over behaviors, or values that can change. If you change an input to a function, the output (another behavior) is automatically recomputed. The dependency bookkeeping is taken care of by the framework; we'll use the froc library.

It turns out to be convenient to give behaviors a monadic interface. So we have a type 'a behavior; we turn a constant into a behavior with return, and we use a behavior with bind. We saw in part 2 that the monadic interface of Lwt enables blocking: since bind takes a function to apply to the result of a thread, the framework can wait until the thread has completed before applying it. With froc, the framework applies the function passed to bind whenever the bound behavior changes. With both Lwt and froc you can think of a computation as a collection of dependencies rather than a linear sequence.

There's another important piece of functional reactive programming: events. An 'a event in froc is a channel over which values of type 'a can be passed. You can connect froc events to DOM events to interact with the stateful world of the UI. The library includes several functions for working with events (e.g. mapping a function over an event stream) and in particular for mediating between behaviors and events, such as:

val hold : 'a -> 'a event -> 'a behavior
which takes an initial value and an event channel, and returns a behavior that begins at the initial value then changes to each successive value that's sent on the channel, and
val changes : 'a behavior -> 'a event
which takes a behavior and returns an event channel that has a value sent on it whenever the behavior changes.

This all probably seems a bit abstract, so let's dive into the example code:

module D = Dom
let d = D.document

module F = Froc
module Fd = Froc_dom
let (>>=) = F.(>>=)
We set up some constants we'll need below. The Froc module contains the core FRP implementation, not tied to a particular UI toolkit; Froc_dom contains functions that are specific to DOM programming (with the Dom module we saw before).
let make_cell v =
  let ev = F.make_event () in
  let cell = F.hold v ev in
  let set v = F.send ev v in
  (cell, set)
let notify_e e f =
  F.notify_e e (function
    | F.Fail _ -> ()
    | F.Value v -> f v)
These are a couple of functions that really should be part of froc (and will be in the next version). The first makes a cell, which is a behavior (the hold of an event channel) along with a function to set its value (which sends the value on the channel). It's like a ref cell, but we can bind it so changes are propagated. We'll have one of these for each square on the Sudoku board, but it is a generally useful construct.

The second papers over a design error in the froc API: like with Lwt threads, a froc behavior or event value can be either a normal value or an exception (together, a result). The notify_e function sets a callback that's called when an event arrives on the channel, but most of the time we just want to ignore exceptional events.

let attach_input_value i b =
  notify_e (F.changes b) (fun v -> i#_set_value v)
let attach_backgroundColor e b =
  notify_e
    (F.changes b)
    (fun v -> e#_get_style#_set_backgroundColor v)
These are functions that should be part of Froc_dom. To attach a DOM element to a behavior means to update the DOM element whenever the behavior changes. But there are lots of ways to update a DOM element, and Froc_dom doesn't include them all. (This design contrasts with that of Flapjax, where you work with behaviors whose value is an entire DOM element. It's certainly possible to do this in froc, but more tedious because of the types.)
let (check_enabled, set_check_enabled) = make_cell false
Now we're in the application code. The check_enabled cell controls whether checking is turned on--we'll see below what this is for, as you may have noticed that there is no such switch in the actual UI.
let make_board () =
  let make_input () =
    let input = (d#createElement "input" : D.input) in
    input#setAttribute "type" "text";
    input#_set_size 1;
    input#_set_maxLength 1;
    let style = input#_get_style in
    style#_set_border "none";
    style#_set_padding "0px";

    let (cell, set) = make_cell None in
    attach_input_value input
      (cell >>= function
        | None -> F.return ""
        | Some v -> F.return (string_of_int v));
    let ev =
      F.map
        (function
          | "1" | "2" | "3" | "4" | "5"
          | "6" | "7" | "8" | "9"  as v ->
            Some  (int_of_string v)
          | _ -> None)
        (Fd.input_value_e input) in
    notify_e ev set;
    (cell, set, input) in
Here we make the game board much as we did in part 1. The main difference is that instead of working directly with DOM input nodes, we connect each input to a cell of type int option. The attach_input call sets the value of the DOM input node whenever the cell changes, and the notify_e call sets the cell whenever the input node changes. (This doesn't loop, because Fd.input_value_e makes an event stream from the "onchange" events of the input, and "onchange" events are only sent when the user changes the input, not when it's changed from Javascript.) We take the stream of strings and map it into a stream of int options, validating the string as we go.
  let rows =
    Array.init 9 (fun i ->
      Array.init 9 (fun j ->
        make_input ())) in

  let adjacents i j =
    let adj i' j' =
      (i' <> i || j' <> j) &&
        (i' = i or j' = j or
            (i' / 3 = i / 3 && j' / 3 = j / 3)) in
    let rec adjs i' j' l =
      match i', j' with
        | 9, _ -> l
        | _, 9 -> adjs (i'+1) 0 l
        | _, _ ->
            let l =
              if adj i' j'
              then
                let (cell,_,_) = rows.(i').(j') in
                cell::l
              else l in
            adjs i' (j'+1) l in
    adjs 0 0 [] in
We make the game board as a matrix of inputs as before, but now each element of the matrix contains a cell (an int option behavior), the function to set that cell, and the actual DOM input element. Next we set up the rule-checking. The adjacents function returns a list of cells adjacent to the cell at (i, j) (adjacent in the sense we discussed above). All my bugs when I wrote this example were in this function, but it clearly embodies the specification we're trying to meet: a cell is adjacent to the current cell if it is not the same cell and is in the same row, column, or square. (The loop would be clearer if we had Array.foldi.)
  ArrayLabels.iteri rows ~f:(fun i row ->
    ArrayLabels.iteri row ~f:(fun j (cell, _, input) ->
      let adjs = adjacents i j in
      attach_backgroundColor input
        (check_enabled >>= function
          | false -> F.return "#ffffff"
          | true ->
              F.bindN adjs (fun adjs ->
                cell >>= fun v ->
                  if v <> None && List.mem v adjs
                  then F.return "#ff0000"
                  else F.return "#ffffff"))));
This is the functional reactive core of the program. For each square on the board we compute essentially the highlighted function above, but in monadic form (the bindN function binds a list of behaviors at once), and attach the result to the background color of the input node. Because the set of adjacent cells does not depend on the value of the cells, we can hoist its computation out of the reactive part so it won't be recomputed every time a cell changes (and since dependency on a behavior is captured in the type of a function, the fact that this typechecks tells us it is safe to do!).

That's it. The rest of the program is almost the same as before. (Here's the full code.) The one important change has to do with check_enabled. In the reaction to cell changes, we consult check_enabled, returning the unhighlighted color when it's false. Since we do this before binding the cells, a change to a cell causes no recomputation when check_enabled is false. So we turn off check_enabled while loading a new game board, saving a lot of needless recomputation that otherwise makes it annoyingly slow.

It's interesting to compare functional reactive programming to the model-view-controller pattern. The point of MVC is to separate the changeable state (the model) from how it is displayed (the view). Although MVC is typically implemented with change events and state update, a view behaves as a pure function of the state (or can be made so by making the state of UI components explicit). So you could think of FRP as "automatic" MVC: you just write down dependencies (with bind) and the framework manages events and state update. For small examples this may not seem like a big win, but FRP takes care of some complexities that tend to swamp MVC apps: managing dynamic dependencies (registering and unregistering event handlers in response to events) and maintaining coherence (i.e. functional behavior) over different event orders.

I haven't yet written a serious application with froc, but so far I think it is awesome!

Sunday, May 3, 2009

Sudoku in ocamljs, part 2: RPC over HTTP

Last time we made a simple user interface for Sudoku with the Dom module of ocamljs. It isn't a very fun game though since there are no pre-filled numbers to constrain the board. So let's add a button to get a new game board; here's the final result.

I don't know much about generating Sudoku boards, but it seems like it might be slow to do it in the browser, so we'll do it on the server, and communicate to the server with OCaml function calls using the RPC over HTTP support in orpc.

The 5-minute monad

But first I'm going to give you a brief introduction to monads (?!). Bear with me until I can explain why we need monads for Sudoku, or skip it if this is old hat to you. We'll transform the following fragment into monadic form:

let foo () = 7 in
bar (foo ())
First put it in named form by let-binding the result of the nested function application:
let foo () = 7 in
let f = foo () in
bar f
Then introduce two new functions, return and bind:
let return x = x
let bind x f = f x

let foo () = return 7 in
bind (foo ()) (fun f ->
  bar f)
These functions are a bit mysterious (although the name "bind" is suggestive of let-binding), but we haven't changed the meaning of the fragment. Next we would like to enforce that the only way to use the result of foo () is by calling bind. We can do that with an abstract type:
type 'a t
val return : 'a -> 'a t
val bind  : 'a t -> ('a -> 'b t) -> 'b t
Taking type 'a t = 'a, the definitions of return and bind match this signature. So what have we accomplished? We've abstracted out the notion of using the result of a computation. It turns out that there are many useful structures matching this signature (and satisfying some equations), called monads. It's convenient that they all match the same signature, in part because we can mechanically convert ordinary code into monadic code, as we've done here, or even use a syntax extension to do it for us.

Lightweight threads in Javascript

One such useful structure is the Lwt library for cooperative threads. You can write Lwt-threaded code by taking ordinary threaded code and converting it to monadic style. In Lwt, 'a t is the type of threads returning 'a. Then bind t f calls f on the value of the thread t once t has finished, and return x is an already-finished thread with value x.

Lwt threads are cooperative: they run until they complete or block waiting on the result of another thread, but aren't ever preempted. It can be easier to reason about this kind of threading, because until you call bind, there's no possibility of another thread disturbing any state you're working on.

Lwt threads are a great match for Javascript, which doesn't have preemptive threads (although plugins like Google Gears provide them), because they need no special support from the language except closures. Typically in Javascript you write a blocking computation as a series of callbacks. You're doing essentially the same thing with Lwt, but it's packaged up in a clean interface.

Orpc for RPC over HTTP

The reason we care about threads in Javascript is that we want to make a blocking RPC call to the server to retrieve a Sudoku game board, without hanging the browser. We'll use orpc to generate stubs for the client and server. In the client the call returns an Lwt thread, so you need to call bind to get the result. In the server it arrives as an ordinary procedure call.

To use orpc you write down the signature of the RPC interface, in Lwt and Sync forms for the client and server. Orpc checks that the two forms are compatible, and generates the stubs. Here's our interface (proto.ml):

module type Sync =
sig
  val get_board : unit -> int option array array
end

module type Lwt =
sig
  val get_board : unit -> int option array array Lwt.t
end
The get_board function returns a 9x9 array, each cell of which may contain None or Some k where k is 1 to 9. We can't capture all these constraints in the type, but we get more static checking than if we were passing JSON or XML.

Generating the board

On the server, we implement a module that matches the Sync signature. (You can see that I didn't actually implement any Sudoku-generating code, but took some fixed examples from Gnome Sudoku.) Then there's some boilerplate to set up a Netplex HTTP server and register the module at the /sudoku path. It's pretty simple. The Proto_js_srv module contains stubs generated by orpc from proto.ml, and Orpc_js_server is part of the orpc library.

Using the board

The client is mostly unchanged from last time. There's a new button, "New game", that makes the RPC call, then fills in the board from the result.

let (>>=) = Lwt.(>>=)
The >>= operator is another name for bind. If you aren't using pa_monad (which we aren't here), it makes a sequence of binds easier to read.
module Server =
  Proto_js_clnt.Lwt(struct
    let with_client f = f (Orpc_js_client.create "/sudoku")
  end)
This sets up the RPC interface, so calls on the Server module become RPC calls to the server. The Proto_js_client module contains stubs generated from proto.ml, and Orpc_js_client is part of the orpc library. (In the actual source you'll see that I faked this out in order to host the running example on Google Code--there's no way to run an OCaml server, so I randomly choose a canned response.)
let get_board rows _ =
  ignore
    (Server.get_board () >>= fun board ->
      for i = 0 to 8 do
        for j = 0 to 8 do
          let cell = rows.(i).(j) in
          let style = cell#_get_style in
          style#_set_backgroundColor "#ffffff";
          match board.(i).(j) with
            | None ->
                cell#_set_value "";
                cell#_set_disabled false
            | Some n ->
                cell#_set_value (string_of_int n);
                cell#_set_disabled true
        done
      done;
      Lwt.return ());
  false
This is the event handler for the "New game" button. We call get_board, bind the result, then fill in the board. If there's a number in a cell we disable the input box so the player can't change it. Here's the full code.

Doing AJAX programming with orpc and Lwt really shows off the power of compiling OCaml to Javascript. While Google Web Toolkit has a similar RPC mechanism (that generates stubs from Java interfaces), it's much clumsier to use, because you're still working at the level of callbacks rather than threads. Maybe you could translate Lwt to Java, but it would be painfully verbose without type inference.

This monad stuff will come in handy again next time, when we'll revisit the problem of checking the Sudoku constraints on the board, using froc.