Monday 25 July 2011

A Safe & Asynchronous Promise implementation explained - Part 2

Hi!

This the second post of a two part about the Promise implementation (first part here).

This second post is about the implementation detail of the MutablePromise, the LazyPromise and the AsyncPromise.

MutablePromise

This is the base implementation of Promises, it provides their registration and dispatch mechanisms.

So if you recall our Promise definition; it has two methods to implement:

abstract class Promise[+T] {
  def foreachEither(f: Either[Throwable, T] => Unit): Unit
  def nowEither(): Option[Either[Throwable, T]]
}


'nowEither' return the current promise state.
'foreachEither' provides a way to register a closure to be call immediately if the value is fulfilled or latter when it will become fulfilled.

In order to achieve good performance, we design a non blocking system for registration and value access.
We'll use a concurrentLinkedQueue in order to store callbacks to execute when the value is fulfilled and a volatile to store the actual value.

private val _callBacks= new ConcurrentLinkedQueue[Either[Throwable, T] => Unit]
  @volatile
  protected var _value: Option[Either[Throwable, T]] = None

We also need two methods to set a successful value or a failure on the MutablePromise :

final def set(x: => T) = setPromiseEither(Right(x))
  final def fail(ex: => Throwable) = setPromiseEither(Left(ex))

they are wrapping their respective parameter in an Either and forward to the setPromiseEither we've already seen in the previous post.
(Note that both methods parameter are thunk (call by name) and that setPromiseEither parameter is also a thunk so that any failure evaluating those would be caught by the setPromiseEither method)

Here is its implementation:

final def setPromiseEither(ev: => Either[Throwable, _ <: T]) = {
    try {  
      _value = Some(ev) // evaluation could throw
    } catch {
      case e: Throwable => _value = Some(Left(e))
    }
    synchronized {
      notifyAll()
    }
    propagate()
    this
  }

'ev' is a call by name parameter so that we can catch any possible exception when evaluating it.
If it throws, we set the Promise as failing with this new exception otherwise, the promise is set with the value passed as parameter.

After setting the value we notify any possible waiting thread, so they can make progress.
Note: this is only requiered in order to support unsafe methods (until, etc..), in a non blocking usage (desired), we could completely get rid of this (and nowEither in Promise abstract class).

After having woke up those thread, we propagate the value to all callbacks.

private def propagate(): Unit =
    nowEither() match {
      case Some(finalValue) =>
        @tailrec
        def internPropagate: Unit = {
          var finished = false;
          try {
            var elem = _callBacks.poll()
            while (elem != null) {
              elem.apply(finalValue)
              elem = _callBacks.poll()
            }
            finished = true;
          } catch {
            case e => e.printStackTrace
          }
          if (!finished)
            internPropagate // continue to consume the poll
        }
        internPropagate
      case _ =>
    }


Propagate takes the current value of the promise.
If the value is not set, does nothing.
If the value is set then consume the concurrentLinkedQueue using its atomic poll method.
Note that during the execution of this method, some other thread may register some new callBacks; as we use a concurrentLinkedQueue, no thread will block.
Each closure we poll out of the queue is executed in a try catch so that any failure would not prevent to execute remaining closures.
This try catch should never catch anything as all our combinators should take care of that but it is useful when one use the lower level API to develop new ones.

The propagate method tries to consume all the callBacks from the queue, however, it is not garanteed the queue is empty when exiting the method as some other thread could have fulfilled it.
The only contract being that all callBacks present in the queue at the time the method is called will be executed.
Consuming all callbacks even those added to the queue is not really fair regarding global progress but it is faster, this would need to be parametrized.

Now, let see how newly registred callBacks are executed (if the value is fulfilled):

def foreachEither(f: Either[Throwable, T] => Unit): Unit = {
    _callBacks offer f
    propagate()
  }

We add the callback to the queue and call propagate.
From our contract, the propagate method consume all callBacks set when called iff a value is set, so we're sure it will be either executed in this call or, in case the value is still undefined, with the call to propagate issued from setPromiseEither when the value is set (even on another thread).

Ok, so now we have a working MutablePromise:

class MutablePromise[T] extends Promise[T] {
  private val _callBacks = new ConcurrentLinkedQueue[Either[Throwable, T] => Unit]
  
  @volatile // may be accessed from different threads.
  protected var _value: Option[Either[Throwable, T]] = None
  
  final def set(x: => T) = setPromiseEither(Right(x))
  final def fail(ex: => Throwable) = setPromiseEither(Left(ex))

  def nowEither(): Option[Either[Throwable, T]] = _value
  def foreachEither(f: Either[Throwable, T] => Unit): Unit = {
    _callBacks offer f
    propagate()
  }

  final def setPromiseEither(ev: => Either[Throwable, _ <: T]) = {
    try {  
      _value = Some(ev) // evaluation could throw
    } catch {
      case e: Throwable => _value = Some(Left(e))
    }
    synchronized {
      notifyAll()
    }
    propagate()
    this
  }

  private def propagate(): Unit =
    nowEither() match {
      case Some(finalValue) =>
        @tailrec
        def internPropagate: Unit = {
          var finished = false;
          try {
            var elem = _callBacks.poll()
            while (elem != null) {
              elem.apply(finalValue)
              elem = _callBacks.poll()
            }
            finished = true;
          } catch {
            case e => e.printStackTrace
          }
          if (!finished)
            internPropagate // continue to consume the poll
        }
        internPropagate
      case _ =>
    } 
}


Let see how to implement the AsyncPromise:

final class AsyncPromise[T] extends MutablePromise[T] {
  override def foreachEither(f: Either[Throwable, T] => Unit): Unit =
    super.foreachEither {x => PromiseExec newSpawn f(x) }
}

We extend MutablePromise and override forEachEither and forward the execution of its parameter function to a threadPool using newSpawn witch register a task to execute 'f(x)'.
'f(x)' is passed in a thunk and is actually executed by the task, not during this registration.

Here's its usage in PromiseIsAsynchronous extension:

final class PromiseIsAsynchronous[+T](outer: Promise[T]) {

def async: Promise[T] = {
val prom = Promise.async[T]() // returns an unbinding AsyncPromise
outer.foreachEither { prom setPromiseEither _ }
prom
}

...

}


Now its time to review how we've achieved lazyPromise.
I've decide to consider that a lazyPromise would execute when it's value is directly or indirectly required (make sense).

A LazyPromise must have its pending execution as parameter.

Here's its code:

final class LazyPromise[T](var lazyAction: () => Unit) extends MutablePromise[T] {

  final def doLazyActionIfRequiered() = {
    if (lazyAction != null) synchronized { // narrowing test
      if (lazyAction != null) {
        val toEvaluate = lazyAction
        lazyAction = null // we free the lazy thunk memory
        try {
          toEvaluate()
        } catch {
          case e => fail(e) // could be _value = Some(Left(e))
        }
      }
    }
  }

  override def nowEither(): Option[Either[Throwable, T]] = {
    doLazyActionIfRequiered() // the reason we've overrided nowEither; triggering lazy computation.
    super.nowEither()
  }
}


So the LazyPromise extends MutablePromise, overriding its nowEither method.
The idea is to intercept all access to the value and if requiered, execute the pending lazyAction.

The evaluation is rather simple, you can note we're doing a narrowing test to avoid any unnecessary synchronization call and also that we're setting the lazyAction to null; this both indicates the lazyAction has been executed and also free the thunk memory (for GC).
LazyAction does not need to be volatile as it is accessed / modified inside a synchronization.

Note that we're not force to call fail(e) in case of failure because it is unnecessary to perform all propagation; indeed, in the case of LazyPromise, doLazyActionIfRequiered will be executed during the first call to nowEither which will happen when the first callback registration will call propagate.

The case of success, which could happen asynchronously, is handled in the usual way as you can see in the lazily method implementation in the PromiseIsLazy extension:

final class PromiseIsLazy[+T](outer: => Promise[T]) {
  def lazily : Promise[T] = {
    lazy val prom: LazyPromise[T] = // lazy to solve initialisation order problem
      new LazyPromise[T](() => outer foreachEither (prom setPromiseEither _))
    prom
  }
}

The outer is passed in a thunk in order to be evaluated on demand..

Here it is; we've covered the second and last post on this Promise implementation.
It is really far from perfect and would need some evolutions (like parametrizing executors..) but I hope it was interesting to some of you.

The repo can be found on github here .

As usual comments are welcome :)

No comments:

Post a Comment