Monday, June 23, 2008

Playing around with parallel maps in Scala

Hacking around with various programming languages is fun. Lots of people have been advocating polyglotism for quite some time now. And I am a firm believer in this mantra (heck! isn't there a programming language by the same name ? Yeah! and it runs on the JVM too). Irrespective of whether you use it in your next project or not, learning a new language always gives you yet another orthogonal line of thinking for the problem at hand. But I digress.

Over the last couple of days I have been hacking around with a few implementations of a parallel map in Scala. The one suggested in Scala By Example is based on actor per thread model and for obvious reasons does not scale when your system cranks out its ability to spawn new threads. Scala's event based actors are designed such that the receive is not modeled as a wait on a blocked thread, but as a closure that captures the rest of the computation of the actor. Anyway, the basic premise is that we get lots of scalability and a feeling of an implementation of threadless concurrency on the JVM. However, in reality, Scala implements a unified model of actors, which get executed through a scheduler and a bunch of worker threads at a much more granular level of concurrency than the thread-per-actor model.

Here is an implementation designed in the line of Joe Armstrong's pmap that delivered a near linear speedup for 7 CPUs ..

class Mapper[A, B](l: List[A], f: A => B) {  def pmap = {    gather(      for(idx <- (0 until l.length).toList) yield {        scala.actors.Futures.future {          f(l(idx))        }      }    )  }  def gather(mappers: List[Future[B]]): List[B] = {    def gatherA(mappers: List[Future[B]], result: List[B]): List[B] = mappers match {      case m :: ms => gatherA(ms, m() :: result)      case Nil => result    }    gatherA(mappers, Nil).reverse  }}

A purely functional implementation. The gather() method has been made tail recursive, which should be optimized away in Scala. We all know that functions passed on to map are not supposed to have any side-effects - right ? Hence we can parallelize the invocations across all members of the list. And the above implementation does that. Also, gather() should return a new list which has the same order as the original list. The above implementation does that as well.

Can I improve the above implementation ? Well, in what way ? It is functional, and hence composable, side-effect-free and enjoys the implicit parallelism offered by the pure functional paradigm.

But how much pure is too pure and how much pure is pragmatic enough ?

In the above implementation, the construction of the target list is done sequentially by gathering all results in the order of the original list. And the gathering process starts after all of the generation processes have been initiated. Ideally we can get around this bottleneck through a predictive interleaving of the target element generation and the preparing of the target list processes. Introduce some impurity in the form of a mutable array that can be concurrently updated while the functions are being invoked on the elements of the original list ..

class Mapper[A, B](l: List[A], f: A => B) {  def pmap = {    val buffer = new Array[B](l.length)    val mappers =      for(idx <- (0 until l.length).toList) yield {        scala.actors.Futures.future {          buffer(idx) = f(l(idx))      }    }    for(mapper <- mappers) mapper()    buffer  }}

With multicores, the latter implementation should perform more efficiently as a drop-in replacement for Scala's List.map function.

Monday, June 09, 2008

Targeting BEAM for extreme reliability

Sometime back I got to see Reia, a Python-Ruby esque language targetting the Erlang virtual machine (BEAM) and the high performance native compiler (HiPE). Much before discovering Reia, I had exchanged a couple of tweets with Michael Feathers expressing my surprise why not many languages target BEAM as the runtime. BEAM provides an ecosystem that offers phenomenal scalability with concurrent processes and distributed programming, which is really difficult (if not impossible) to replicate in any other virtual machine being worked upon today. After all, it is much easier to dress up Erlang with a nicer syntax than implementing the guarantees of reliability and extreme concurrency in any of your favorite languages' runtime.

And Reia precisely does that! The following is from the wiki pages of Reia describing the philosophy behind the design ..
Reia combines the strengths of existing language concepts. The goal isn't to invent new ones, but to provide a mix of existing approaches which provides an easy and painless approach to writing concurrent programs. To that end, Reia feels like Python with a dash of Ruby, and the concurrent programming ideas behind Erlang added to the mix.

Reia also leverages the highly mature Erlang VM and OTP framework. These both provide a bulletproof foundation for concurrent systems which has been highly optimized over the years, particularly in regard to SMP systems. There's no need to build a new VM when an awesome one like BEAM is available.

Bunking the single assignment semantics ..

This is something I don't like about Reia. From the FAQ .. "Reia does away with many of the more obtuse semantics in Erlang, including single assignment." Single assignment in Erlang is all about extreme reliability and providing developers a lower cost of fixing bugs that may arise in highly distributed systems with hot swapping and transparent failover. The code that I write today in Reia may not be deployed in a distributed architecture right now. But it has to be safe enough to interplay with Erlang modules that already run in the same virtual machine as part of an extremely reliable architecture. This is the basic idea behind cooperative polyglotism - modules written in various languages can interplay seamlessly within the confines of the honored VM.

Erlang guarantees immutability at the language level. Once you allow destructive assignment, you are no longer playing to the strengths of the ecosystem. A similar logic holds with Scala actors built on top of the Java Virtual Machine. Because the language does not guarantee immutability, you can always send mutable data between actors, which can lead to race conditions. And as Yariv explains here, this leaves you just where you started: having to ensure synchronized access to shared memory. And because of the fact that neither Java nor Scala ensures language level immutability, you cannot really take advantage of the numerous existing Java libraries out there while implementing your actor based concurrent system in Scala. Without single assignment enforced, systems written in Reia may also have the same consequence while interoperating with Erlang modules.

Monday, June 02, 2008

Java to Scala - Smaller Inheritance hierarchies with Structural Typing

I was going through a not-so-recent Java code base that contained the following structure for modeling the employee hierarchy of an organization. This looks quite representative of idiomatic Java being used to model a polymorphic hierarchy for designing a payroll generation application.

public interface Salaried {  int salary();}public class Employee implements Salaried {  //..  //.. other methods  @Override  public int salary() {    // implementation  }}public class WageWorker implements Salaried {  //..  //.. other methods  @Override  public int salary() {    // implementation  }}public class Contractor implements Salaried {  //..  //.. other methods  @Override  public int salary() {    // implementation  }}

And the payroll generation class (simplified for brevity ..) that actually needs the subtype polymorphism between the various concrete implementations of the Salaried interface.

public class Payroll {  public int makeSalarySheet(List<Salaried> emps) {    int total = 0;    for(Salaried s : emps) {      total += s.salary();    }    return total;  }}

While implementing in Java, have you ever wondered whether using public inheritance is the best approach to model such a scenario ? After all, in the above class hierarchy, the classes Employee, WageWorker and Contractor does not have *anything* in common except the fact that all of them are salaried persons and that subtype polymorphism has to be modeled *only* for the purpose of generating paysheets for all of them through a single API. In other words, we are coupling the entire class hierarchy through a compile time static relationship only for the purpose of unifying a single commonality in behavior.

Public inheritance has frequently been under fire, mainly because of the coupling that it induces between the base and the derived classes. Experts say Inheritance breaks Encapsulation and also regards it as the second strongest relationship between classes (only next to friend classes of C++). Interface driven programming has its advantages in promoting loose coupling between the contracts that it exposes and their concrete implementations. But interfaces in Java also pose problems when it comes to evolution of an API - once an interface is published, it is not possible to make any changes without breaking client code. No wonder we find design patterns like The Extension Object or strict guidelines for evolution of abstractions being enforced in big projects like Eclipse.

Finer Grained Polymorphism

Structural typing offers the ability to reduce the scope of polymorphism only over the subset of behaviors that need to be common between the classes. Just as in duck typing, commonality in abstractions does not mean that they belong to one common type; but only the fact that they respond to a common set of messages. Scala offers the benefit of both the worlds through its implementation of structural typing - a compile time checked duck typing. Hence we have a nice solution to unify certain behaviors of otherwise unrelated classes. The entire class hierarchy need not be related through static compile time subtyping relationship in order to be processed polymorphically over a certain set of behaviors. As an example, I tried modeling the above application using Scala's structural typing ..

case class Employee(id: Int) { def salary: Int = //.. }case class DailyWorker(id: Int) { def salary: Int = //.. }case class Contractor(id: Int) { def salary: Int = //.. }class Payroll {  def makeSalarySheet(emps: List[{ def salary: Int }]) = {    (0 /: emps)(_ + _.salary)  }}val l = List[{ def salary: Int }](DailyWorker(1), Employee(2), Employee(1), Contractor(9))val p = new Payrollprintln(p.makeSalarySheet(l))

The commonality in behavior between the above classes is through the method salary and is only used in the method makeSalarySheet for generating the payroll. We can generalize this commonality into an anonymous type that implements a method having the same signature. All classes that implement a method salary returning an Int are said to be structurally conformant to this anonymous type { def salary: Int }. And of course we can use this anonymous type as a generic parameter to a Scala List. In the above snippet we define makeSalarySheet accept such a List as parameter, which will include all types of workers defined above.

Actually it gets better than this with Scala. Suppose in the above model, the name salary is not meaningful for DailyWorkers and the standard business terminology for their earnings is called wage. Hence let us assume that for the DailyWorker, the class is defined as ..

case class DailyWorker(id: Int) { def wage: Int = //.. }

Obviously the above scheme will not work now, and the unfortunate DailyWorker falls out of the closure of all types that qualify for payroll generation.

In Scala we can use implicit conversion - I call it the Smart Adapter Pattern .. we define a conversion function that automatically converts wage into salary and instructs the compiler to adapt the wage method to the salary method ..

case class Salaried(salary: Int)implicit def wageToSalary(in: {def wage: Int}) = Salaried(in.wage)

makeSalarySheet api now changes accordingly to process a List of objects that either implement an Int returning salary method or can be implicitly converted to one with the same contract. This is indicated by <% and is known as a view bound in Scala. Here is the implementation of the class Payroll that incorporates this modification ..

class Payroll {  def makeSalarySheet[T <% { def salary: Int }](emps: List[T]) = {    (0 /: emps)(_ + _.salary)  }}

Of course the rest of the program remains the same since all conversions and implicit magic takes place with the compiler .. and we can still process all objects polymorphically even with a different method name for DailyWorker. Here is the complete source ..

case class Employee(id: Int) { def salary: Int = //.. }case class DailyWorker(id: Int) { def salary: Int = //.. }case class Contractor(id: Int) { def wage: Int = //.. }case class Salaried(salary: Int)implicit def wageToSalary(in: {def wage: Int}) = Salaried(in.wage)class Payroll {  def makeSalarySheet[T <% { def salary: Int }](emps: List[T]) = {    (0 /: emps)(_ + _.salary)  }}val l = List[{ def salary: Int }](DailyWorker(1), Employee(2), Employee(1), Contractor(9))val p = new Payrollprintln(p.makeSalarySheet(l))

With structural typing, we can afford to be more conservative with public inheritance. Inheritance should be used *only* to model true subtype relationship between classes (aka LSP). Inheritance definitely has lots of uses, we only need to use our judgement not to misuse it. It is a strong relationship and, as the experts say, always try to implement the least strong relationship that correctly models your problem domain.