Monday, 25 July 2011

A Safe & Asynchronous Promise implementation explained - Part 2

Hi!

This the second post of a two part about the Promise implementation (first part here).

This second post is about the implementation detail of the MutablePromise, the LazyPromise and the AsyncPromise.

MutablePromise

This is the base implementation of Promises, it provides their registration and dispatch mechanisms.

So if you recall our Promise definition; it has two methods to implement:

abstract class Promise[+T] {
  def foreachEither(f: Either[Throwable, T] => Unit): Unit
  def nowEither(): Option[Either[Throwable, T]]
}


'nowEither' return the current promise state.
'foreachEither' provides a way to register a closure to be call immediately if the value is fulfilled or latter when it will become fulfilled.

In order to achieve good performance, we design a non blocking system for registration and value access.
We'll use a concurrentLinkedQueue in order to store callbacks to execute when the value is fulfilled and a volatile to store the actual value.

private val _callBacks= new ConcurrentLinkedQueue[Either[Throwable, T] => Unit]
  @volatile
  protected var _value: Option[Either[Throwable, T]] = None

We also need two methods to set a successful value or a failure on the MutablePromise :

final def set(x: => T) = setPromiseEither(Right(x))
  final def fail(ex: => Throwable) = setPromiseEither(Left(ex))

they are wrapping their respective parameter in an Either and forward to the setPromiseEither we've already seen in the previous post.
(Note that both methods parameter are thunk (call by name) and that setPromiseEither parameter is also a thunk so that any failure evaluating those would be caught by the setPromiseEither method)

Here is its implementation:

final def setPromiseEither(ev: => Either[Throwable, _ <: T]) = {
    try {  
      _value = Some(ev) // evaluation could throw
    } catch {
      case e: Throwable => _value = Some(Left(e))
    }
    synchronized {
      notifyAll()
    }
    propagate()
    this
  }

'ev' is a call by name parameter so that we can catch any possible exception when evaluating it.
If it throws, we set the Promise as failing with this new exception otherwise, the promise is set with the value passed as parameter.

After setting the value we notify any possible waiting thread, so they can make progress.
Note: this is only requiered in order to support unsafe methods (until, etc..), in a non blocking usage (desired), we could completely get rid of this (and nowEither in Promise abstract class).

After having woke up those thread, we propagate the value to all callbacks.

private def propagate(): Unit =
    nowEither() match {
      case Some(finalValue) =>
        @tailrec
        def internPropagate: Unit = {
          var finished = false;
          try {
            var elem = _callBacks.poll()
            while (elem != null) {
              elem.apply(finalValue)
              elem = _callBacks.poll()
            }
            finished = true;
          } catch {
            case e => e.printStackTrace
          }
          if (!finished)
            internPropagate // continue to consume the poll
        }
        internPropagate
      case _ =>
    }


Propagate takes the current value of the promise.
If the value is not set, does nothing.
If the value is set then consume the concurrentLinkedQueue using its atomic poll method.
Note that during the execution of this method, some other thread may register some new callBacks; as we use a concurrentLinkedQueue, no thread will block.
Each closure we poll out of the queue is executed in a try catch so that any failure would not prevent to execute remaining closures.
This try catch should never catch anything as all our combinators should take care of that but it is useful when one use the lower level API to develop new ones.

The propagate method tries to consume all the callBacks from the queue, however, it is not garanteed the queue is empty when exiting the method as some other thread could have fulfilled it.
The only contract being that all callBacks present in the queue at the time the method is called will be executed.
Consuming all callbacks even those added to the queue is not really fair regarding global progress but it is faster, this would need to be parametrized.

Now, let see how newly registred callBacks are executed (if the value is fulfilled):

def foreachEither(f: Either[Throwable, T] => Unit): Unit = {
    _callBacks offer f
    propagate()
  }

We add the callback to the queue and call propagate.
From our contract, the propagate method consume all callBacks set when called iff a value is set, so we're sure it will be either executed in this call or, in case the value is still undefined, with the call to propagate issued from setPromiseEither when the value is set (even on another thread).

Ok, so now we have a working MutablePromise:

class MutablePromise[T] extends Promise[T] {
  private val _callBacks = new ConcurrentLinkedQueue[Either[Throwable, T] => Unit]
  
  @volatile // may be accessed from different threads.
  protected var _value: Option[Either[Throwable, T]] = None
  
  final def set(x: => T) = setPromiseEither(Right(x))
  final def fail(ex: => Throwable) = setPromiseEither(Left(ex))

  def nowEither(): Option[Either[Throwable, T]] = _value
  def foreachEither(f: Either[Throwable, T] => Unit): Unit = {
    _callBacks offer f
    propagate()
  }

  final def setPromiseEither(ev: => Either[Throwable, _ <: T]) = {
    try {  
      _value = Some(ev) // evaluation could throw
    } catch {
      case e: Throwable => _value = Some(Left(e))
    }
    synchronized {
      notifyAll()
    }
    propagate()
    this
  }

  private def propagate(): Unit =
    nowEither() match {
      case Some(finalValue) =>
        @tailrec
        def internPropagate: Unit = {
          var finished = false;
          try {
            var elem = _callBacks.poll()
            while (elem != null) {
              elem.apply(finalValue)
              elem = _callBacks.poll()
            }
            finished = true;
          } catch {
            case e => e.printStackTrace
          }
          if (!finished)
            internPropagate // continue to consume the poll
        }
        internPropagate
      case _ =>
    } 
}


Let see how to implement the AsyncPromise:

final class AsyncPromise[T] extends MutablePromise[T] {
  override def foreachEither(f: Either[Throwable, T] => Unit): Unit =
    super.foreachEither {x => PromiseExec newSpawn f(x) }
}

We extend MutablePromise and override forEachEither and forward the execution of its parameter function to a threadPool using newSpawn witch register a task to execute 'f(x)'.
'f(x)' is passed in a thunk and is actually executed by the task, not during this registration.

Here's its usage in PromiseIsAsynchronous extension:

final class PromiseIsAsynchronous[+T](outer: Promise[T]) {

def async: Promise[T] = {
val prom = Promise.async[T]() // returns an unbinding AsyncPromise
outer.foreachEither { prom setPromiseEither _ }
prom
}

...

}


Now its time to review how we've achieved lazyPromise.
I've decide to consider that a lazyPromise would execute when it's value is directly or indirectly required (make sense).

A LazyPromise must have its pending execution as parameter.

Here's its code:

final class LazyPromise[T](var lazyAction: () => Unit) extends MutablePromise[T] {

  final def doLazyActionIfRequiered() = {
    if (lazyAction != null) synchronized { // narrowing test
      if (lazyAction != null) {
        val toEvaluate = lazyAction
        lazyAction = null // we free the lazy thunk memory
        try {
          toEvaluate()
        } catch {
          case e => fail(e) // could be _value = Some(Left(e))
        }
      }
    }
  }

  override def nowEither(): Option[Either[Throwable, T]] = {
    doLazyActionIfRequiered() // the reason we've overrided nowEither; triggering lazy computation.
    super.nowEither()
  }
}


So the LazyPromise extends MutablePromise, overriding its nowEither method.
The idea is to intercept all access to the value and if requiered, execute the pending lazyAction.

The evaluation is rather simple, you can note we're doing a narrowing test to avoid any unnecessary synchronization call and also that we're setting the lazyAction to null; this both indicates the lazyAction has been executed and also free the thunk memory (for GC).
LazyAction does not need to be volatile as it is accessed / modified inside a synchronization.

Note that we're not force to call fail(e) in case of failure because it is unnecessary to perform all propagation; indeed, in the case of LazyPromise, doLazyActionIfRequiered will be executed during the first call to nowEither which will happen when the first callback registration will call propagate.

The case of success, which could happen asynchronously, is handled in the usual way as you can see in the lazily method implementation in the PromiseIsLazy extension:

final class PromiseIsLazy[+T](outer: => Promise[T]) {
  def lazily : Promise[T] = {
    lazy val prom: LazyPromise[T] = // lazy to solve initialisation order problem
      new LazyPromise[T](() => outer foreachEither (prom setPromiseEither _))
    prom
  }
}

The outer is passed in a thunk in order to be evaluated on demand..

Here it is; we've covered the second and last post on this Promise implementation.
It is really far from perfect and would need some evolutions (like parametrizing executors..) but I hope it was interesting to some of you.

The repo can be found on github here .

As usual comments are welcome :)

Friday, 22 July 2011

A Safe & Asynchronous Promise implementation explained - Part 1

Building safe, asynchronous, composable computations is notoriously known as difficult.
But a program that always terminate, handle failure scenario and that is easy to write, read and reason about should be the bread and butter of most programming activity.
Today I will expose you a Promise library I've done for this purpose during freetime three years ago  (in the old Scala 2.7 days).
While it may be better, those days, to use Akka or ScalaZ abstraction to stay in the momentum, I still think it could be valuable to provide a nice explanation of its  underlying architecture as it will help most of you figuring out how to build great functional abstractions with close to metal performance.

This first post will introduce you the Promise base class and some abstractions built out of it.
The second post will show you their lower level implementation.
(This is an old library so do not expect any 2.8+ features (like continuations..), the expected reader is experienced in Scala but not necessarily at advanced level).

Safe, asynchronous and composable computations?

Let's explain a bit further the intend:
  • Composable: The code is built using combinators on top of the Promises and/or other combinators (functionnal programming style).
     
  • Asynchronous: when doing an url request you don't want to waste a thread waiting for an answer, so basically this means that you use composition to define the rest of the computation ahead of execution.
     
  • Safe: no implicit exception throwing in your program, they'll be caught and you have to deal with, statically enforced (not the java way).
(The Promise abstraction presented in this post integrates several concepts in one implementation; this was done for performance reasons but a cleaner separation could be achieved).


Gimme some food, please

Ok enought talk, I'll show you some code and explain as it goes.
abstract class Promise[+T] {
  def nowEither(): Option[Either[Throwable, T]]
  def foreachEither(f: Either[Throwable, T] => Unit): Unit
}

So here's the Promise abstract class, it has 2 methods;
  • nowEither: It returns a snapshot of the Promise value, revealing its signature Option[Either[Throwable, T]]; the Option part indicates if the computation has terminated, the Either part encode either the parametrized value Right(v) or the error Left(err). It is, in my opinion is a better practice to keep things separated with meaning rather than creating three new case classes to define the Promise state. It will save you rewriting all combinators Option and Either already have.

  • foreachEither:  You pass it a function taking a Either[Throwable, T] and it executes a side effect with its value of type Either[Throwable, T] when fulfilled.
Promise is covariant in its type parameter T ([+T]), meaning that it will play well with subtyping.(i.e: a Promise[Ferrari] could be used as Promise[Car] iff Ferrari is a subtype of Car).
We could have get rid of nowEither as it is not really necessary but we would not be able to implement some pragmatic debug tools without introducing some overhead.

With this Promise definition; we will build some implementations (Mutable, Asynchronous and Lazy in next  post) and a lot of nice extensions / combinators to unleash their power (this post).

Here are some implicits built in order to import Promises functionnalities in scope :
object PromiseImplicits {
  implicit def toMonadPlus[T](p: Promise[T]) = new PromiseIsMonadPlus(p)
  implicit def toAsync[T](p: Promise[T]) = new PromiseIsAsynchronous(p)
  implicit def toAbstractFailure[T](p: Promise[Either[Throwable, T]]) = new PromiseCanAbstractFailure(p)
  implicit def toExposeFailure[T](p: Promise[T]) = new PromiseCanExposeFailure(p)
  implicit def toEitherMonadPlus[T](p: Promise[T]) = new PromiseIsEitherMonadPlus(p)
  implicit def toCombinators[T](p: Promise[T]) = new PromiseCombinators(p)
}

object PromiseUnsafeImplicits {
  implicit def toUnsafe[T](p: Promise[T]) = new PromiseCanBeUnsafe(p)
}

We're defining two Objects, one of them being called PromiseUnsafeImplicits  because it provides some functions that compromise the safety of the promises but can be helpful when developping.
(If you're not used to Implicits it a must know pattern and I invite you to read more about it: Poor man's typeclass from Odersky).
Let's start with it, so we can forget it earlier:
final class PromiseCanBeUnsafe[+T](outer: Promise[T]) {

  final def getOrThrow(): T =
    outer.nowEither() match {
      case Some(Right(v)) => v
      case Some(Left(err)) => throw err
      case _ => throw Promise.undefined
    }  
  
  final def waitUntil(delay: Long) = {
    outer.nowEither(); // to trigger lazyPromises before synchronizing
    synchronized {
      if (outer.nowEither().isEmpty) {
        wait(delay) // could throw an InterruptedException, we're in unsafe land..
      }
    }
    outer
  }
  
  final def waitBefore(delay: Long): Promise[T] = 
    waitUntil(delay).now() match {
      case Some(_) => outer
      case None => Promise.timeoutPromise // not ready yet
    }

  final def waitFulfilled(): Promise[T] =
    waitUntil(0)

  final def waitAndGetUntil(timeOut: Long): Option[T] = {
    waitUntil(timeOut).now()
  }

  final def waitAndGet() = waitFulfilled().now()
}


It takes an outer Promise as a parameter that its methods are using to provide new functionnality, a common pattern with implicits extensions.
  • getOrThrow simply ask for the current value of the computation (nowEither) and pattern match on its value.
    If it's not fulfilled (None case handled by '_'), it throws an undefined exception defined by a Promise Object (which implements promise constructors and constants).
    If it is fulfilled, it returns the Value if the Either value is Right (synonym of success) or throw the exception stored in Left (the exception owns the reason why it fails, wherever it cames from).
    So, yes it throws an exception, and that's why it's unsafe (and a smell for bad coding style).
    (In the same way all methods directly or indirectly using waitUntil are unsafe as they're blocking the current thread).
     
  • You can see a 'now()' method call in 'waitAndGetUntil', the latter is provided by the 'PromiseIsAsynchronous' implicitly; it provides an instant view of the promise current Value (using nowEither under the hood). It is kept in 'PromiseIsAsynchronous' and because it is not unsafe and I would like a programmer to feel confortable knowing they're not using unsafe features by removing the PromiseUnsafeImplicits while keeping this functionnality.
     
  • 'waitFulfilled' and 'waitAndGet' are blocking an unbound amount of time, they're otherwise equivalent to 'waitUntil' and 'waitAndGetUntil'.
     
  • 'waitBefore' returns the Promise under a certain amount of time or a timeout Promise defined in the Promise Object, it is also blocking (an equivalent, non blocking solution has been implemented but is not part of this billet as it relies on another component).

Example:

def unsafeExample(prom : Promise[String]) {    
    val boundedProm = prom.waitBefore(56) // returns the string if provided in the next 56 ms or error
    val myString = boundedProm.getOrThrow()
    println("myString", myString)
  }
Again all those usage must be banned from your code as much as possible; they're just here in case you wanna do something unsafe during development.
The introduction was rude and not very interesting, I had to tell you about those before you think about them.
Now let's go back to a better world. :)

Promise is a Monad.

A useful concept from functional programming that Scala enables is the concept of Monad.
Some well known Monads; List, Options, Either, etc.. they're all Monadics, that means that you can use some nice ways to combine them consistently (quickly said) and also use Scala's for comprehension syntacting sugar.
There's plenty of papers on the subject, so I invite you to read more about them.
Before introducing the 'Promise Monad' (kind of), I must say this version is not usable (in its form) with for comprehension because it does not follows the right naming convention; in fact it has the functionnality but it is meant to be exposed later using a particular method so that the API feels more natural (you'll undestand why later..).

final class PromiseIsEitherMonadPlus[+T](outer: Promise[T]) {

  final def mapEither[U](f: Either[Throwable, T] => Either[Throwable, U]): Promise[U] = {
    val prom = Promise[U]()
    outer.foreachEither { x => prom setPromiseEither f(x) }
    prom
  }

  final def flatMapEither[U](f: Either[Throwable, T] => Promise[U]): Promise[U] = {
    val prom = Promise[U]()
    outer.foreachEither { eith =>
      try {
        f(eith) foreachEither { prom setPromiseEither _ }
      } catch {
        case e => prom fail e
      }
    }
    prom
  }

  final def filterEither(pred: Either[Throwable, T] => Boolean): Promise[T] = {
    val prom = Promise[T]()
    outer.foreachEither { x => prom.setPromiseEither(if (pred(x)) x else Promise.leftUndefined) }
    prom
  }

}
Some of you may have understood it's in fact a Monad Plus because it has a filterEither method which means that some 'null' or 'empty' element (Promise.leftUndefined) in the Monad world. (If you read an Haskell introduction to Monads, you may find that a Monad plus introduce the choice using a mzero and mplus. Let's say they're equivalent for now.)
(In a normal Monad, empty is a singleton, here it is not because we consider all Exceptions begin equal, which is wrong, but we can live with it).
So far so good, so let's comment out those methods (imperatively..);
  • MapEither takes a function, creates a Promise calls a foreachEither with the function on the outer (which will be called and sets the newly created Promise with the value transformed by it's parameter function f when fulfilled) and immediately returns it.
     
  • FilterEither follows the same path but sets the Promise to the outer's value when available of leftUndefined if the predicate is not true for this value.
     
  • FlatmapEither is a little more involving but is quite understandable. Notice how we've carrefully encapsulated the function evaluation to catch a possible exception and then bind it's value to the new Promise; it's how we bring safety to the API, transforming throwed exception to Left(err) values using a fail call on the promise.

We'll build most features atop of Promises this way; by creating new Promise instances and returning them after 'registering computation on them'. It is very imperative, but in the end, we'll have a declarative usable API.
You've noticed several things along the road:
  • We've used some new methods; 'where does the fail and setPromiseEither methods comes from'? They are not part of the Promise definition. In fact, you've just seen a first use of an implementation of Promise; a 'MutablePromise' which implements some way to create an empty Promise and set its value at a later time. MutablePromises are created via the 'Promise()' call. The API tend to hide it as soon as possible under the Promise interface (it does not leak from the function). When wrapping some external libraries, this is also the path to follow.

  • You may have wonder why the abstraction will not leak in mapEither implementation as there's no try/catch! In fact, setPromiseEither takes a thunk of computation as parameter and not a value, deferring it's actual evalutation in a safer place (surrounded by another try catch).

  • Thanks to the Either representation, we're always dealing with the 'Exceptional' case Left(exception).
     

Example:

def mapEitherExample(fileNameProm : Promise[String], fileContent : String => Promise[String]) {
    
    // Not the code you're dreaming of..
    val lengthProm = 
      fileNameProm.flatMapEither{
       case Right(name) => fileContent(name)
       case Left(err) => Promise.failure(err)
      }.mapEither{
        case Right(v) => Right(v.length)
        case Left(l) => Left(l)
      }
    
    lengthProm.foreachEither {
      case Right(v) => println("successful text length is " + v)
      case Left(err) => println("we add an error " + err.toString());
    }
  }

Ok, but dealing with Either..

Indeed, it could become verbose and boring; I recall a great man saying; if you pattern match too much, you're lacking a combinator..
So here comes PromiseIsMonadPlus with our beloved monad methods :
final class PromiseIsMonadPlus[+T](private val outer: Promise[T]) {

  final def foreach(f: T => Unit): Unit =
    outer.foreachEither {
      case Right(x) => f(x)
      case _ =>
    }

  final def map[U](f: T => U): Promise[U] =
    outer.mapEither {
      case Right(x) => Right(f(x))
      case Left(err) => Left(err)
    }

  final def flatMap[U](f: T => Promise[U]): Promise[U] =
    outer.flatMapEither {
      case Right(v) => f(v)
      case _ => outer.asInstanceOf[Promise[U]] // we're abusing the compiler but this is safe
    }

  final def filter(pred: T => Boolean): Promise[T] =
    outer.filterEither {
      case Right(x) => pred(x)
      case _ => false
    }

  final def failure(): Promise[Throwable] =
    outer.mapEither {
      case Left(err) => Right(err)
      case _ => Promise.leftUndefined
    }

}
Ok, so this one will be easy to explain; all it does is wrapping the calls to PromiseIsEitherMonadPlus in order to expose only some methods involving values unboxed from Either.
Two things to explain here;
  • I've used 'outer.asInstanceOf[Promise[U]]' because it cost to construct a new promise to handle the same Exception and has the computation is already performed, there's no semantic difference returning it, saving computation for higher performance (it's a hack).
     
  • 'failure' does something interesting; it returns a promise which owns the failure as value Either[Throwable, Throwable], this is useful when you want to deal with a computation failure. (Note that subsequent faillure of Promise built from it will not appear as values as they would be handled as usual in the Left part of Either).

  • As Either is already a Monad, I could have use less pattern matching, relying on its Right/Left Projections (read the scala.Either doc for more information), I've deliberately kept the code easy to grasp with basic knowledge as I know early Scala developpers discover Either way after List / Options.

Example:

def mapExample(fileNameProm : Promise[String], fileContent : String => Promise[String]) {
    
    val lengthProm = 
      fileNameProm.flatMap(fileContent).map{_.length}
    
    val lengthProm2 = 
      for {
        fileName <– fileNameProm
        content <– fileContent(fileName)
      } yield content.length
    
    lengthProm.foreachEither {
      case Left(err) => println("we add an error " + err.toString())
      case Right(v) => println("successful text length is " + v)
    }
      
  }

Way better! You can also see an example of for comprehension (lengthProm2 is equivalent to lengthProm).

But sometimes I have to deal with it..

Perfect, let's introduce two useful implementations.
final class PromiseCanAbstractFailure[+T](outer: Promise[Either[Throwable, T]]) {
  def unliftEither: Promise[T] = {
    val prom = Promise[T]()
    outer.foreachEither {
      case Right(v) => prom setPromiseEither v
      case Left(err) => prom setPromiseEither Left(err)
    }
    prom
  }
}

final class PromiseCanExposeFailure[+T](outer: Promise[T]) {
  def liftEither: Promise[Either[Throwable, T]] = {
    val prom = Promise[Either[Throwable, T]]()
    outer.foreachEither { prom setPromise _ }
    prom
  }
}
They give you the ability to expose, or not, the Either boxing; recall I told you we would have a more natural way to deal with the Promise in explicit Either form, here it is.
Note the outer type of PromiseCanAbstractFailure; this means that it can only be used on Promise who's value type is already a Either, that's a way to use the power of implicits to provide functionnalities depending on the type.

Example:

def eitherMonadExample(fileNameProm : Promise[String], fileContent : String => Promise[String]) {
   
    val lengthProm = fileNameProm.flatMap(fileContent).map{_.length}
    val lengthPromEither = lengthProm.liftEither
    
    lengthPromEither.foreach{ // normal foreach (not foreachEither), we can now also use for comprehensions..
      case Left(err) => println("we had an error " + err.toString());
      case Right(v) => println("successful text length is " + v)
    }
    
    val lengthProm2 = lengthPromEither.unliftEither // back to Promise[Int]
 
  }



Asynchronicity.

In fact, a lot of asynchronicity has already been covered; its at the heart of the promise as they rely on 'callbacks' so each time we've called (directly or indirectly) foreachEither, we've built asynchronous computations.
Here's the implementation of some additional asynchronicity methods for the Promises:
final class PromiseIsAsynchronous[+T](outer: Promise[T]) {

  def async: Promise[T] = {
    val prom = Promise.async[T]()
    outer.foreachEither { prom setPromiseEither _ }
    prom
  }
 
  final def now(): Option[T] =
    outer.nowEither() match {
      case Some(Right(v)) => Some(v)
      case _ => None
    }
  
}
Async is a forwarder on a promise specifically built using Promise.async().
This constructor will create a promise and deffer its execution using a threadPoolExecutor, bringing the ability to start computation asynchronously (more to come in the next post).
'now()' is the helper method we've discussed at the beginning of this post, it provides a way to access the current Success value without the Either boxing.

Example:

def asyncExample(fileNameProm : Promise[String], fileContent : String => Promise[String]) = {
    println("DEBUG: current value " + fileNameProm.now()) // during dev
    fileNameProm.async.flatMap(fileContent) // I am sure I will return ASAP and it will be executed asynchronously
  }


But where's my orElse stuff?

Ok, you recall we said Promise were a Monad Plus, so it's time to unleash some of its power..
Here are some combinators among stands our beloved orElse and an interesting variation..
final class PromiseCombinators[+T](outer: Promise[T]) {

  def orElse[U >: T](other: => Promise[U]): Promise[U] = {
    val prom = Promise[U]()
    outer.foreachEither {
      case r@Right(_) => prom.setPromiseEither(r)
      case _ =>
        try {
          other.foreachEither { prom setPromiseEither _ }
        } catch {
          case err: Throwable => prom.setPromiseEither(Left(err))
        }
    }
    prom
  }

  def &&[U](other: => Promise[U]): Promise[(T, U)] = {
    val prom = Promise[(T, U)]()
    outer.foreachEither {
      case Right(t) =>
        try {
          other.foreachEither {
            case Right(u) => prom.setPromise((t, u))
            case Left(err) => prom.setPromiseEither(Left(err))
          }
        } catch {
          case err => prom.setPromiseEither(Left(err)) // caused by 'other' evaluation
        }
      case Left(err) => prom.setPromiseEither(Left(err))
    }
    prom
  }

  def or[U >: T](other: Promise[U]): Promise[U] = {
    val prom = Promise[U]()
    val status = new java.util.concurrent.atomic.AtomicInteger(0) // 0 -> not set, 2 -> value set (right), 1 -> one error (left)

    val updateAsNeeded: Either[Throwable, U] => Unit = {
      case r: Right[_, _] =>
        if (status.getAndSet(2) < 2) { 
          prom.setPromiseEither(r)
        }
      case l: Left[_, _] =>
        if (status.getAndSet(1) == 1) { ///we only want to write if we are the second error
          prom.setPromiseEither(l)
        }
    }

    outer.foreachEither(updateAsNeeded)
    other.foreachEither(updateAsNeeded)

    prom
  }

}
Let analyze that a bit.
OrElse give you the ability to build a Promise whose value will be bound to a first promise if it succeed or a second one otherwise (note that it could be chained; you can reduce a List of promises with it). OrElse is based on building blocks already exposed so far; the interesting new elements are:
  • U >: T which means that U is defined as a super type of T. You recall, as our Promise are covariant, they can play nicely with subtyping. So if you use orElse with two Promises, the resulting promise will have it's type equals to the lower common upperBound of both types, ultimately Any.
     
  • 'other' parameter is passed as a thunk; meaning it is not evaluated yet (i.e. the Promise instance may not have been created); each call to it's name will result in an evaluation, hence the 'call by name evaluation' naming (pun not intended :) ). This evaluation takes place surrounded by some try catch which sets the failure if any.
'&&' method provides a way to build a Promise out of two other Promises, combining their result in a pair using two pattern matching. Nothing new is introduced here.

'or' method is a bit more interesting and will bring us an example usage of the atomic package. Basically, or is the same as orElse but without any notion of ordering and will requiers both, potentially lazy, computations (orElse requiers the second only if the first has failed).
'or' returns a Promise who's value is the equals to the value of the first successful Promise.
So we want to get the first successful promise or the second failing one. We do not want to block (lock free) and have no specific order.
We'll use an atomic integer to represent our status with three posibilities;
  • 0, nothing has happened yet (initial state).
  • 1, an error happened.
  • 2, a success happened.
When a Promise succeed, we get the previous value to check if we already had a success while setting the success value in an atomic way (it's using a microprocessor special CAS (check and set) blazing fast non bloking instruction under the hood, if available), if no success, then we can fulfill the new Promise; if the other Promise would have failed, the status could have been 1, and then everything would be equaly fine.
When a Promise fails, we check if the status equals 1 (the other Promise has failed) while setting it to 1, if that's true, then we can fail the new Promise safely.
(Note that a good extension to the Promise would be the ability to provide a way to combine the two failing exceptions in order to give the ability to not loose information, it could be done and would not be difficult, however, storing the first exception would requier a Volatile variable in order to be sure the variable would be visible consistently from any threads (promises may be executed in different threads) as nowadays processors, mostly working in their cache at full speed, are not as synched with the main memory as the uninitiated would guess).

Example:

def orElseExample(localizedContent : Promise[String], defaultContent : Promise[String]) = {
    val content = localizedContent orElse defaultContent
    
    val localizedAndDefault = localizedContent && defaultContent
    
    val firstDelivered = localizedContent or defaultContent
  }


Ok, now we've covered a lot of functionnalities. We've also  seen some way to implement concurrent atomic operations.
I hope this post was useful and will make another post diving into the Promises implementations very soon.

You may find the github project here, I've put everything in a single file to ease playing with it: https://github.com/sledorze/Promises

part two is here

This is my first post on this Blog, so be kind, no trolling, usefull feedback welcome and Share! :)