Friday 22 July 2011

A Safe & Asynchronous Promise implementation explained - Part 1

Building safe, asynchronous, composable computations is notoriously known as difficult.
But a program that always terminate, handle failure scenario and that is easy to write, read and reason about should be the bread and butter of most programming activity.
Today I will expose you a Promise library I've done for this purpose during freetime three years ago  (in the old Scala 2.7 days).
While it may be better, those days, to use Akka or ScalaZ abstraction to stay in the momentum, I still think it could be valuable to provide a nice explanation of its  underlying architecture as it will help most of you figuring out how to build great functional abstractions with close to metal performance.

This first post will introduce you the Promise base class and some abstractions built out of it.
The second post will show you their lower level implementation.
(This is an old library so do not expect any 2.8+ features (like continuations..), the expected reader is experienced in Scala but not necessarily at advanced level).

Safe, asynchronous and composable computations?

Let's explain a bit further the intend:
  • Composable: The code is built using combinators on top of the Promises and/or other combinators (functionnal programming style).
     
  • Asynchronous: when doing an url request you don't want to waste a thread waiting for an answer, so basically this means that you use composition to define the rest of the computation ahead of execution.
     
  • Safe: no implicit exception throwing in your program, they'll be caught and you have to deal with, statically enforced (not the java way).
(The Promise abstraction presented in this post integrates several concepts in one implementation; this was done for performance reasons but a cleaner separation could be achieved).


Gimme some food, please

Ok enought talk, I'll show you some code and explain as it goes.
abstract class Promise[+T] {
  def nowEither(): Option[Either[Throwable, T]]
  def foreachEither(f: Either[Throwable, T] => Unit): Unit
}

So here's the Promise abstract class, it has 2 methods;
  • nowEither: It returns a snapshot of the Promise value, revealing its signature Option[Either[Throwable, T]]; the Option part indicates if the computation has terminated, the Either part encode either the parametrized value Right(v) or the error Left(err). It is, in my opinion is a better practice to keep things separated with meaning rather than creating three new case classes to define the Promise state. It will save you rewriting all combinators Option and Either already have.

  • foreachEither:  You pass it a function taking a Either[Throwable, T] and it executes a side effect with its value of type Either[Throwable, T] when fulfilled.
Promise is covariant in its type parameter T ([+T]), meaning that it will play well with subtyping.(i.e: a Promise[Ferrari] could be used as Promise[Car] iff Ferrari is a subtype of Car).
We could have get rid of nowEither as it is not really necessary but we would not be able to implement some pragmatic debug tools without introducing some overhead.

With this Promise definition; we will build some implementations (Mutable, Asynchronous and Lazy in next  post) and a lot of nice extensions / combinators to unleash their power (this post).

Here are some implicits built in order to import Promises functionnalities in scope :
object PromiseImplicits {
  implicit def toMonadPlus[T](p: Promise[T]) = new PromiseIsMonadPlus(p)
  implicit def toAsync[T](p: Promise[T]) = new PromiseIsAsynchronous(p)
  implicit def toAbstractFailure[T](p: Promise[Either[Throwable, T]]) = new PromiseCanAbstractFailure(p)
  implicit def toExposeFailure[T](p: Promise[T]) = new PromiseCanExposeFailure(p)
  implicit def toEitherMonadPlus[T](p: Promise[T]) = new PromiseIsEitherMonadPlus(p)
  implicit def toCombinators[T](p: Promise[T]) = new PromiseCombinators(p)
}

object PromiseUnsafeImplicits {
  implicit def toUnsafe[T](p: Promise[T]) = new PromiseCanBeUnsafe(p)
}

We're defining two Objects, one of them being called PromiseUnsafeImplicits  because it provides some functions that compromise the safety of the promises but can be helpful when developping.
(If you're not used to Implicits it a must know pattern and I invite you to read more about it: Poor man's typeclass from Odersky).
Let's start with it, so we can forget it earlier:
final class PromiseCanBeUnsafe[+T](outer: Promise[T]) {

  final def getOrThrow(): T =
    outer.nowEither() match {
      case Some(Right(v)) => v
      case Some(Left(err)) => throw err
      case _ => throw Promise.undefined
    }  
  
  final def waitUntil(delay: Long) = {
    outer.nowEither(); // to trigger lazyPromises before synchronizing
    synchronized {
      if (outer.nowEither().isEmpty) {
        wait(delay) // could throw an InterruptedException, we're in unsafe land..
      }
    }
    outer
  }
  
  final def waitBefore(delay: Long): Promise[T] = 
    waitUntil(delay).now() match {
      case Some(_) => outer
      case None => Promise.timeoutPromise // not ready yet
    }

  final def waitFulfilled(): Promise[T] =
    waitUntil(0)

  final def waitAndGetUntil(timeOut: Long): Option[T] = {
    waitUntil(timeOut).now()
  }

  final def waitAndGet() = waitFulfilled().now()
}


It takes an outer Promise as a parameter that its methods are using to provide new functionnality, a common pattern with implicits extensions.
  • getOrThrow simply ask for the current value of the computation (nowEither) and pattern match on its value.
    If it's not fulfilled (None case handled by '_'), it throws an undefined exception defined by a Promise Object (which implements promise constructors and constants).
    If it is fulfilled, it returns the Value if the Either value is Right (synonym of success) or throw the exception stored in Left (the exception owns the reason why it fails, wherever it cames from).
    So, yes it throws an exception, and that's why it's unsafe (and a smell for bad coding style).
    (In the same way all methods directly or indirectly using waitUntil are unsafe as they're blocking the current thread).
     
  • You can see a 'now()' method call in 'waitAndGetUntil', the latter is provided by the 'PromiseIsAsynchronous' implicitly; it provides an instant view of the promise current Value (using nowEither under the hood). It is kept in 'PromiseIsAsynchronous' and because it is not unsafe and I would like a programmer to feel confortable knowing they're not using unsafe features by removing the PromiseUnsafeImplicits while keeping this functionnality.
     
  • 'waitFulfilled' and 'waitAndGet' are blocking an unbound amount of time, they're otherwise equivalent to 'waitUntil' and 'waitAndGetUntil'.
     
  • 'waitBefore' returns the Promise under a certain amount of time or a timeout Promise defined in the Promise Object, it is also blocking (an equivalent, non blocking solution has been implemented but is not part of this billet as it relies on another component).

Example:

def unsafeExample(prom : Promise[String]) {    
    val boundedProm = prom.waitBefore(56) // returns the string if provided in the next 56 ms or error
    val myString = boundedProm.getOrThrow()
    println("myString", myString)
  }
Again all those usage must be banned from your code as much as possible; they're just here in case you wanna do something unsafe during development.
The introduction was rude and not very interesting, I had to tell you about those before you think about them.
Now let's go back to a better world. :)

Promise is a Monad.

A useful concept from functional programming that Scala enables is the concept of Monad.
Some well known Monads; List, Options, Either, etc.. they're all Monadics, that means that you can use some nice ways to combine them consistently (quickly said) and also use Scala's for comprehension syntacting sugar.
There's plenty of papers on the subject, so I invite you to read more about them.
Before introducing the 'Promise Monad' (kind of), I must say this version is not usable (in its form) with for comprehension because it does not follows the right naming convention; in fact it has the functionnality but it is meant to be exposed later using a particular method so that the API feels more natural (you'll undestand why later..).

final class PromiseIsEitherMonadPlus[+T](outer: Promise[T]) {

  final def mapEither[U](f: Either[Throwable, T] => Either[Throwable, U]): Promise[U] = {
    val prom = Promise[U]()
    outer.foreachEither { x => prom setPromiseEither f(x) }
    prom
  }

  final def flatMapEither[U](f: Either[Throwable, T] => Promise[U]): Promise[U] = {
    val prom = Promise[U]()
    outer.foreachEither { eith =>
      try {
        f(eith) foreachEither { prom setPromiseEither _ }
      } catch {
        case e => prom fail e
      }
    }
    prom
  }

  final def filterEither(pred: Either[Throwable, T] => Boolean): Promise[T] = {
    val prom = Promise[T]()
    outer.foreachEither { x => prom.setPromiseEither(if (pred(x)) x else Promise.leftUndefined) }
    prom
  }

}
Some of you may have understood it's in fact a Monad Plus because it has a filterEither method which means that some 'null' or 'empty' element (Promise.leftUndefined) in the Monad world. (If you read an Haskell introduction to Monads, you may find that a Monad plus introduce the choice using a mzero and mplus. Let's say they're equivalent for now.)
(In a normal Monad, empty is a singleton, here it is not because we consider all Exceptions begin equal, which is wrong, but we can live with it).
So far so good, so let's comment out those methods (imperatively..);
  • MapEither takes a function, creates a Promise calls a foreachEither with the function on the outer (which will be called and sets the newly created Promise with the value transformed by it's parameter function f when fulfilled) and immediately returns it.
     
  • FilterEither follows the same path but sets the Promise to the outer's value when available of leftUndefined if the predicate is not true for this value.
     
  • FlatmapEither is a little more involving but is quite understandable. Notice how we've carrefully encapsulated the function evaluation to catch a possible exception and then bind it's value to the new Promise; it's how we bring safety to the API, transforming throwed exception to Left(err) values using a fail call on the promise.

We'll build most features atop of Promises this way; by creating new Promise instances and returning them after 'registering computation on them'. It is very imperative, but in the end, we'll have a declarative usable API.
You've noticed several things along the road:
  • We've used some new methods; 'where does the fail and setPromiseEither methods comes from'? They are not part of the Promise definition. In fact, you've just seen a first use of an implementation of Promise; a 'MutablePromise' which implements some way to create an empty Promise and set its value at a later time. MutablePromises are created via the 'Promise()' call. The API tend to hide it as soon as possible under the Promise interface (it does not leak from the function). When wrapping some external libraries, this is also the path to follow.

  • You may have wonder why the abstraction will not leak in mapEither implementation as there's no try/catch! In fact, setPromiseEither takes a thunk of computation as parameter and not a value, deferring it's actual evalutation in a safer place (surrounded by another try catch).

  • Thanks to the Either representation, we're always dealing with the 'Exceptional' case Left(exception).
     

Example:

def mapEitherExample(fileNameProm : Promise[String], fileContent : String => Promise[String]) {
    
    // Not the code you're dreaming of..
    val lengthProm = 
      fileNameProm.flatMapEither{
       case Right(name) => fileContent(name)
       case Left(err) => Promise.failure(err)
      }.mapEither{
        case Right(v) => Right(v.length)
        case Left(l) => Left(l)
      }
    
    lengthProm.foreachEither {
      case Right(v) => println("successful text length is " + v)
      case Left(err) => println("we add an error " + err.toString());
    }
  }

Ok, but dealing with Either..

Indeed, it could become verbose and boring; I recall a great man saying; if you pattern match too much, you're lacking a combinator..
So here comes PromiseIsMonadPlus with our beloved monad methods :
final class PromiseIsMonadPlus[+T](private val outer: Promise[T]) {

  final def foreach(f: T => Unit): Unit =
    outer.foreachEither {
      case Right(x) => f(x)
      case _ =>
    }

  final def map[U](f: T => U): Promise[U] =
    outer.mapEither {
      case Right(x) => Right(f(x))
      case Left(err) => Left(err)
    }

  final def flatMap[U](f: T => Promise[U]): Promise[U] =
    outer.flatMapEither {
      case Right(v) => f(v)
      case _ => outer.asInstanceOf[Promise[U]] // we're abusing the compiler but this is safe
    }

  final def filter(pred: T => Boolean): Promise[T] =
    outer.filterEither {
      case Right(x) => pred(x)
      case _ => false
    }

  final def failure(): Promise[Throwable] =
    outer.mapEither {
      case Left(err) => Right(err)
      case _ => Promise.leftUndefined
    }

}
Ok, so this one will be easy to explain; all it does is wrapping the calls to PromiseIsEitherMonadPlus in order to expose only some methods involving values unboxed from Either.
Two things to explain here;
  • I've used 'outer.asInstanceOf[Promise[U]]' because it cost to construct a new promise to handle the same Exception and has the computation is already performed, there's no semantic difference returning it, saving computation for higher performance (it's a hack).
     
  • 'failure' does something interesting; it returns a promise which owns the failure as value Either[Throwable, Throwable], this is useful when you want to deal with a computation failure. (Note that subsequent faillure of Promise built from it will not appear as values as they would be handled as usual in the Left part of Either).

  • As Either is already a Monad, I could have use less pattern matching, relying on its Right/Left Projections (read the scala.Either doc for more information), I've deliberately kept the code easy to grasp with basic knowledge as I know early Scala developpers discover Either way after List / Options.

Example:

def mapExample(fileNameProm : Promise[String], fileContent : String => Promise[String]) {
    
    val lengthProm = 
      fileNameProm.flatMap(fileContent).map{_.length}
    
    val lengthProm2 = 
      for {
        fileName <– fileNameProm
        content <– fileContent(fileName)
      } yield content.length
    
    lengthProm.foreachEither {
      case Left(err) => println("we add an error " + err.toString())
      case Right(v) => println("successful text length is " + v)
    }
      
  }

Way better! You can also see an example of for comprehension (lengthProm2 is equivalent to lengthProm).

But sometimes I have to deal with it..

Perfect, let's introduce two useful implementations.
final class PromiseCanAbstractFailure[+T](outer: Promise[Either[Throwable, T]]) {
  def unliftEither: Promise[T] = {
    val prom = Promise[T]()
    outer.foreachEither {
      case Right(v) => prom setPromiseEither v
      case Left(err) => prom setPromiseEither Left(err)
    }
    prom
  }
}

final class PromiseCanExposeFailure[+T](outer: Promise[T]) {
  def liftEither: Promise[Either[Throwable, T]] = {
    val prom = Promise[Either[Throwable, T]]()
    outer.foreachEither { prom setPromise _ }
    prom
  }
}
They give you the ability to expose, or not, the Either boxing; recall I told you we would have a more natural way to deal with the Promise in explicit Either form, here it is.
Note the outer type of PromiseCanAbstractFailure; this means that it can only be used on Promise who's value type is already a Either, that's a way to use the power of implicits to provide functionnalities depending on the type.

Example:

def eitherMonadExample(fileNameProm : Promise[String], fileContent : String => Promise[String]) {
   
    val lengthProm = fileNameProm.flatMap(fileContent).map{_.length}
    val lengthPromEither = lengthProm.liftEither
    
    lengthPromEither.foreach{ // normal foreach (not foreachEither), we can now also use for comprehensions..
      case Left(err) => println("we had an error " + err.toString());
      case Right(v) => println("successful text length is " + v)
    }
    
    val lengthProm2 = lengthPromEither.unliftEither // back to Promise[Int]
 
  }



Asynchronicity.

In fact, a lot of asynchronicity has already been covered; its at the heart of the promise as they rely on 'callbacks' so each time we've called (directly or indirectly) foreachEither, we've built asynchronous computations.
Here's the implementation of some additional asynchronicity methods for the Promises:
final class PromiseIsAsynchronous[+T](outer: Promise[T]) {

  def async: Promise[T] = {
    val prom = Promise.async[T]()
    outer.foreachEither { prom setPromiseEither _ }
    prom
  }
 
  final def now(): Option[T] =
    outer.nowEither() match {
      case Some(Right(v)) => Some(v)
      case _ => None
    }
  
}
Async is a forwarder on a promise specifically built using Promise.async().
This constructor will create a promise and deffer its execution using a threadPoolExecutor, bringing the ability to start computation asynchronously (more to come in the next post).
'now()' is the helper method we've discussed at the beginning of this post, it provides a way to access the current Success value without the Either boxing.

Example:

def asyncExample(fileNameProm : Promise[String], fileContent : String => Promise[String]) = {
    println("DEBUG: current value " + fileNameProm.now()) // during dev
    fileNameProm.async.flatMap(fileContent) // I am sure I will return ASAP and it will be executed asynchronously
  }


But where's my orElse stuff?

Ok, you recall we said Promise were a Monad Plus, so it's time to unleash some of its power..
Here are some combinators among stands our beloved orElse and an interesting variation..
final class PromiseCombinators[+T](outer: Promise[T]) {

  def orElse[U >: T](other: => Promise[U]): Promise[U] = {
    val prom = Promise[U]()
    outer.foreachEither {
      case r@Right(_) => prom.setPromiseEither(r)
      case _ =>
        try {
          other.foreachEither { prom setPromiseEither _ }
        } catch {
          case err: Throwable => prom.setPromiseEither(Left(err))
        }
    }
    prom
  }

  def &&[U](other: => Promise[U]): Promise[(T, U)] = {
    val prom = Promise[(T, U)]()
    outer.foreachEither {
      case Right(t) =>
        try {
          other.foreachEither {
            case Right(u) => prom.setPromise((t, u))
            case Left(err) => prom.setPromiseEither(Left(err))
          }
        } catch {
          case err => prom.setPromiseEither(Left(err)) // caused by 'other' evaluation
        }
      case Left(err) => prom.setPromiseEither(Left(err))
    }
    prom
  }

  def or[U >: T](other: Promise[U]): Promise[U] = {
    val prom = Promise[U]()
    val status = new java.util.concurrent.atomic.AtomicInteger(0) // 0 -> not set, 2 -> value set (right), 1 -> one error (left)

    val updateAsNeeded: Either[Throwable, U] => Unit = {
      case r: Right[_, _] =>
        if (status.getAndSet(2) < 2) { 
          prom.setPromiseEither(r)
        }
      case l: Left[_, _] =>
        if (status.getAndSet(1) == 1) { ///we only want to write if we are the second error
          prom.setPromiseEither(l)
        }
    }

    outer.foreachEither(updateAsNeeded)
    other.foreachEither(updateAsNeeded)

    prom
  }

}
Let analyze that a bit.
OrElse give you the ability to build a Promise whose value will be bound to a first promise if it succeed or a second one otherwise (note that it could be chained; you can reduce a List of promises with it). OrElse is based on building blocks already exposed so far; the interesting new elements are:
  • U >: T which means that U is defined as a super type of T. You recall, as our Promise are covariant, they can play nicely with subtyping. So if you use orElse with two Promises, the resulting promise will have it's type equals to the lower common upperBound of both types, ultimately Any.
     
  • 'other' parameter is passed as a thunk; meaning it is not evaluated yet (i.e. the Promise instance may not have been created); each call to it's name will result in an evaluation, hence the 'call by name evaluation' naming (pun not intended :) ). This evaluation takes place surrounded by some try catch which sets the failure if any.
'&&' method provides a way to build a Promise out of two other Promises, combining their result in a pair using two pattern matching. Nothing new is introduced here.

'or' method is a bit more interesting and will bring us an example usage of the atomic package. Basically, or is the same as orElse but without any notion of ordering and will requiers both, potentially lazy, computations (orElse requiers the second only if the first has failed).
'or' returns a Promise who's value is the equals to the value of the first successful Promise.
So we want to get the first successful promise or the second failing one. We do not want to block (lock free) and have no specific order.
We'll use an atomic integer to represent our status with three posibilities;
  • 0, nothing has happened yet (initial state).
  • 1, an error happened.
  • 2, a success happened.
When a Promise succeed, we get the previous value to check if we already had a success while setting the success value in an atomic way (it's using a microprocessor special CAS (check and set) blazing fast non bloking instruction under the hood, if available), if no success, then we can fulfill the new Promise; if the other Promise would have failed, the status could have been 1, and then everything would be equaly fine.
When a Promise fails, we check if the status equals 1 (the other Promise has failed) while setting it to 1, if that's true, then we can fail the new Promise safely.
(Note that a good extension to the Promise would be the ability to provide a way to combine the two failing exceptions in order to give the ability to not loose information, it could be done and would not be difficult, however, storing the first exception would requier a Volatile variable in order to be sure the variable would be visible consistently from any threads (promises may be executed in different threads) as nowadays processors, mostly working in their cache at full speed, are not as synched with the main memory as the uninitiated would guess).

Example:

def orElseExample(localizedContent : Promise[String], defaultContent : Promise[String]) = {
    val content = localizedContent orElse defaultContent
    
    val localizedAndDefault = localizedContent && defaultContent
    
    val firstDelivered = localizedContent or defaultContent
  }


Ok, now we've covered a lot of functionnalities. We've also  seen some way to implement concurrent atomic operations.
I hope this post was useful and will make another post diving into the Promises implementations very soon.

You may find the github project here, I've put everything in a single file to ease playing with it: https://github.com/sledorze/Promises

part two is here

This is my first post on this Blog, so be kind, no trolling, usefull feedback welcome and Share! :)

4 comments:

  1. Promises are also known as 'futures', I believe. The idea is to speculatively evaluate in parallel, IIRC

    It's interesting, but I can't help but think that all this is just working around the lack of light-weight threads in the JVM - with Haskell, you'd just define a function which runs something in one of the green threads and deposit the result in an MVar box.

    ReplyDelete
  2. Indeed, its knowned as Future (like in Akka)!
    I've named them Promise because I found out it was making sense (it can fail).

    You still need more work that simple MVar to bring the same functionnality in Haskell, but yes, it's the buidling block.

    Thanks for the comment!

    ReplyDelete
  3. Hi Stephane,

    I would be nice if you could detail the motivations you had to implement a different version of the Future-Object/Promise concept in Scala (e.g. improvements over the Scala/Akka implementation, differences, etc.)

    Thanks for this post!

    Francisco Pérez-Sorrosal

    ReplyDelete
  4. Oh sorry for the delay Francisco; the reason was that akka did not exist when I wrote it, and I needed it!

    ReplyDelete