Monday, July 14, 2008

Scaling out messaging applications with Scala Actors and AMQP

We have been sandboxing various alternatives towards scaling out Java/JMS based services that we implemented for our clients quite some time back. The services that form the stack comprise of dispatchers and routers meant to handle heavy load and perform heavy duty processing on a huge number of trade messages streaming in from the front and middle offices. I have been exploring lots of options including some of the standard ones like grid based distribution of processing and some wildly insane options like using Erlang/OTP. I knew Erlang/OTP is a difficult sell to a client, though we got some amazing results using OTP and mnesia over a clustered intranet. I was also looking at clustered Scala actors as another option, but I guess the Terracotta implementation is not yet production ready and would be a more difficult sell to clients.

Over the last week, I happened to have a look at RabbitMQ. It is based on Erlang/OTP and implements AMQP, a protocol, built on open standards, designed by the financial industry to replace their existing proprietary message queues. AMQP based implementations create full functional interoperability between conforming clients and messaging middleware servers (also called "brokers").

People have been talking about messaging as the front-runners in implementing enterprise architectures. And RabbitMQ is based on the best of the implementations, that, messaging over a set of clustered nodes, has to offer. Erlang's shared-nothing process hierarchies, extreme pattern matching capabilities and wonderful bit comprehension based binary data handling implement high performance reliable messaging with almost obscene scalability. And with RabbitMQ, the wonderful part is that you can hide your Erlang machine completely from your application programmers, who can still write Java classes to blast bits across the wire using typical message queueing programming paradigms.

AMQP (and hence RabbitMQ) defines the set of messaging capabilities as a set of components that route and store messages and a wire level protocol that defines the interaction between the client and the messaging services. The application has to define the producers and consumers for the messaging engines. And in order to ensure linear scalability, these components also need to be scalable enough to feed the Erlang exchange with enough bits to chew on and harness the full power of OTP.

Linear scalability on the JVM .. enter Scala actors ..

Over the weekend I had great fun with Scala actors churning out messages over AMQP endpoints, serializing trade objects at one end, and dispatching them to the subscribers at the other end for doing necessary trade enrichment calculations. Here are some snippets (simplified for brevity) of the quick and dirty prototype that I cooked up. Incidentally Lift contains actor-style APIs that allows you to send and receive messages from an AMQP broker, and the following prototype uses the same interfaces ..

The class TradeDispatcher is a Scala actor that listens as an AMQP message endpoint. It manages a list of subscribers to the trade message and also sends AMQP messages coming in to the queue/exchange to the list of observers.


// message for adding observers
case class AddListener(a: Actor)

// The trade object that needs to be serialized
@serializable
case class Trade(ref: String, security: String, var value: Int)

case class TradeMessage(message: Trade)

// The dispatcher that listens over the AMQP message endpoint
class TradeDispatcher(cf: ConnectionFactory, host: String, port: Int)
    extends Actor {

  val conn = cf.newConnection(host, port)
  val channel = conn.createChannel()
  val ticket = channel.accessRequest("/data")

  // set up exchange and queue
  channel.exchangeDeclare(ticket, "mult", "direct")
  channel.queueDeclare(ticket, "mult_queue")
  channel.queueBind(ticket, "mult_queue", "mult", "routeroute")

  // register consumer
  channel.basicConsume(ticket, "mult_queue", false, new TradeValueCalculator(channel, this))

  def act = loop(Nil)

  def loop(as: List[Actor]) {
    react {
    case AddListener(a) => loop(:: as)
    case msg@TradeMessage(t) => as.foreach(! msg); loop(as)
    case _ => loop(as)
    }
  }
}



and here is an actor that gets messages from upstream and publishes them to the AMQP exchange ..


class TradeMessageGenerator(cf: ConnectionFactory, host: String,
         port: Int, exchange: String, routingKey: String) extends Actor {

  val conn = cf.newConnection(host, port)
  val channel = conn.createChannel()
  val ticket = channel.accessRequest("/data")

  def send(msg: Trade) {

    val bytes = new ByteArrayOutputStream
    val store = new ObjectOutputStream(bytes)
    store.writeObject(msg)
    store.close

    // publish to exchange
    channel.basicPublish(ticket, exchange, routingKey, null, bytes.toByteArray)
  }

  def act = loop

  def loop {
    react {
      case TradeMessage(msg: Trade) => send(msg); loop
    }
  }
}



The next step is to design the consumer that reads from the exchange and does some business processing. Here the consumer (TradeValueCalculator) does valuation of the trade and has already been registered with the dispatcher above. Then it passes the message back to the dispatcher for relaying to the interested observers. Note that the TradeDispatcher has already passed itself as the actor while registering the object TradeValueCalculator as consumer callback in the snippet above (class TradeDispatcher).


class TradeValueCalculator(channel: Channel, a: Actor)
    extends DefaultConsumer(channel) {

  override def handleDelivery(tag: String, env: Envelope,
               props: AMQP.BasicProperties, body: Array[byte]) {

    val routingKey = env.getRoutingKey
    val contentType = props.contentType
    val deliveryTag = env.getDeliveryTag
    val in = new ObjectInputStream(new ByteArrayInputStream(body))

    // deserialize
    var t = in.readObject.asInstanceOf[Trade]

    // invoke business processing logic
    t.value = computeTradeValue(...)

    // send back to dispatcher for further relay to
    // interested observers
    a ! TradeMessage(t)

    channel.basicAck(deliveryTag, false)
  }
}



I have not yet done any serious benchmarking. But the implementation, on its face, looks wicked cool. Erlang at the backend, for high performance reliable messaging being throttled out by Scala actors in the application layer. Every actor opens up a new channel - the channel-per-thread model that AMQP encourages for multi-threaded client applications and scales so well in RabbitMQ.

Integrating the above classes into a small application prototype is not that difficult. Here is a small service class that uses the above framework classes to have the scala actors flying to talk to RabbitMQ ..


class SampleTradeListener {
  val params = new ConnectionParameters
  params.setUsername("guest")
  params.setPassword("guest")
  params.setVirtualHost("/")
  params.setRequestedHeartbeat(0)

  val factory = new ConnectionFactory(params)
  val amqp = new TradeDispatcher(factory, "localhost", 5672)
  amqp.start

  class TradeListener extends Actor {
    def act = {
      react {
      case msg@TradeMessage(contents: Trade) =>
        println("received trade: " + msg.message); act
      }
    }
  }
  val tradeListener = new TradeListener()
  tradeListener.start
  amqp ! AddListener(tradeListener)
}



Instantiate the above SampleTradeListener class and write a sample message generator facade that sends trdae messages to the consumer TradeMessageGenerator designed above.

Meanwhile here are some other related thoughts behind an AMQP based architecture ..

  • One of the areas which always makes me sceptical about linear scalability of applications is the single point of dependency on the relational database store.

  • If the application involves heavy relational database processing, does it make sense to make use of mnesia's easy distribution and fault tolerance capabilities to achieve overall scalability of the application ? Harness the power of durable and reliable transacted message processing of RabbitMQ and integrate RDBMS storage through write-behind log of AMQP activities.

  • Financial services solutions like back office systems typically need to talk to lots of external systems like Clearing Corporations, Stock Exchanges, Custodians etc. I think AMQP is more meaningful when you have the end points under your control and operate over a low latency, high bandwidth wire. When we talk about messaging over the internet (high latency, low bandwidth), possibly XMPP or Atompub is a better option. RabbitMQ has also released an XMPP gateway, for exposing a RabbitMQ instance to the global XMPP network through an ejabberd extension module. Looks like it's going to be messaging all the way down and up.

5 comments:

Carlos Quiroz said...

Hi

Nice article, I've been looking for alternatives to our activemq based solutions and RabbitMQ seems something really worth to look at.

ActiveMQ is limited by the inherent thread limitations of java and so, though it can process high volumes of messages, it is limted to the amount of simultaneous client.

Would the same limitations apply to your Scala actors? Would that limit the client's scalability?

Unknown said...

@carlos: RabbitMQ is implemented in Erlang and has all the wonderful scalability characteristics that OTP offers viz. seamless distribution, fault tolerance, process supervisors etc. I suggested using Scala actors at the application layer, since Scala actors scale much more than native Java threads. Scala actors are event based and use much more finer grained concurrency techniques. It is not thread-per-actor model and hence scales better.

Anonymous said...

I'm not totally clear that Scala would use one thread per actor if you are doing IO. Could you comment on that? If you have blocking IO I cannot see a way that on thread per actor could work.

That of course doesn't invalidate the actor model at all, is just a limitation of the underlying java platform. I guess you can use NIO in any case.

Unknown said...

@carlos: Scala uses 2 types of actors - one based on OS threads and the other based on class abstractions (event based actors). The thread based ones map 1:1 to OS threads, are heavyweight, do not scale. While the event based ones are lightweight and can be spawned in millions much like Erlang processes. Please have a look at http://lamp.epfl.ch/~phaller/doc/haller07actorsunify.pdf for details of implementation.

But the summary is that, event-based actors are executed in a thread pool, which gets automatically resized in case all threads are blocked due to blocking operations. Hence there is a chance that the thread pool cannot be resized due to system limitations, while all actors call blocking operations.

Ittay Dror said...

can you call loop recursively like you do? it's a tail recursion, but does scala actually optimizes it away? i thought you were supposed to use Actor#loop