Search This Blog

Wednesday, February 17, 2010

F# Futures

I have been using F# a great deal lately. It's one of those languages that, once I got past the syntax and back into thinking "functional decomposition" (as opposed to object-oriented decomposition), I was consistently surprised by how little code was required to solve a problem. A lot like Python, but the runtime is considerably better. Running on Mono it won't keep up with C++ but I would guess my productivity to be 2-3x higher in F#, which makes it appropriate for a bulk of the code.

One issue I ran into early on is that async blocks are nice, but don't work well for computations that return a value. What I was looking for was a way to opportunistically spawn off particular chunks of code to the thread pool and then retrieve the result at some join point. I have not yet found a way to do this with async blocks -- please let me know if I am mistaken.

The Future type below works very much like the Lazy<> type, except that it queues the function to run in the thread pool immediately. Calling the Value method returns the results, blocking until complete if necessary. Exceptions in the function are trapped and re-raised when Value is called.

One example where this is useful is when several intermediate results need to be computed before a final synchronizing point:

let a = parseLogFile (setName + ".log")
    let b = loadGpxDataSet (setName + ".gpx")
    let c = loadElevationData areaBounds
    let model = ConstructModel a b c



In this trivial example, the intermediate results for a, b, and c could be done in parallel. The future type makes this straightforward:
let a = Future( fun () -> parseLogFile (setName + ".log") )
   let b = Future( fun () -> loadGpxDataSet (setName + ".gpx") )
   let c = Future( fun () -> loadElevationData areaBounds )
   let model = ConstructModel a.Value b.Value c.Value



Summary:
Platform: .Net / Mono
Language: F#



/// Provides basic functionality for parallelizing specific 
/// computations that produce a result to be used in the 
/// future.  Similar to async { } blocks but allows the main
/// thread to opportunistically spawn off computations 
/// instead of running the whole flow as an async block.
module Futures =
    open System.ComponentModel
    open System.Threading
    

    
    /// Executes a computation in a background worker and 
    /// synchronizes on the result return.  The computation 
    /// is started immediately and calling 'Value' blocks 
    /// until the result is ready.
    type Future<'t>(f : unit -> 't) =
        
        /// Result of the computation on normal exit.
        let mutable result :'t option = None
        
        
        /// Result if an exception was thrown.
        let mutable ext : Exception option = None
        
        let syncRoot = new Object()
        
        
        /// Pulse object used to wait until a result 
        /// is ready.  ensurePulse() is used so we don't 
        /// have to create the object if the result is 
        /// done before it's needed.
        let mutable pulse : ManualResetEvent = null
        let ensurePulse() = 
            lock syncRoot (fun () ->
                match pulse with 
                | null -> 
                    pulse <- new ManualResetEvent(false);
                | _ -> 
                    ()
                pulse)
            
        
        /// WARNING: Call once a lock on syncRoot is already 
        /// held.  Pulses the wait notifier.  Safe if
        /// called after 'pulse' is created but before 
        /// WaitOne is called.
        let notifyWaiters() = if pulse <> null then pulse.Set() |> ignore
        
        let work = new BackgroundWorker()
  
        
        /// On RunWorkerAsync(), run specified function and 
        /// store result.  All exceptions must be trapped.      
        do work.DoWork.Add( fun args ->
                                try 
                                    result <- Some( f() )
                                with e ->
                                    ext <- Some e
                                lock syncRoot ( fun () -> notifyWaiters()) )

        
        /// Start immediately / automatically.
        do work.RunWorkerAsync()
        
        
        /// Returns the value of the computation, blocking 
        /// if the result isn't ready yet.
        member t.Value =
            // If available, we can return it right away.
            match result with
            | Some x -> x
            | None when ext.IsSome -> raise (Option.get ext)
            | None ->
                let p = ensurePulse()
                
                
                // Check again in case it changed while 
                // we were gettting the wait object.
                match result with
                | Some x -> x
                | None ->
                    
                    // Lock-free is ok because if the pulse.Set() 
                    // method is called between when we
                    // checked 'result' and call WaitOne here, 
                    // WaitOne will return immediately.
                    p.WaitOne(1000000000) |> ignore
                    match result with
                    | Some x -> x
                    | None ->
                        if ext.IsSome then raise (Option.get ext)
                        else failwith "Future computation failed."
    
        
        /// Returns true if the computation is finished, false if not.
        member t.IsComplete =
            match result with
            | Some x -> true
            | None when Option.isSome ext -> true
            | None -> false

No comments:

Post a Comment