Concurrency: Five Ways

Here’s a look at five different concurrency models in Ruby, some whose Ruby implementation is entirely hypothetical, and others which I have implemented in some form or another in the past. We’ll be using a simple threadsafe queue with two operations (get and put) as our example.

My purpose here is not to demonstrate their relative advantages or disadvantages — this particular example is overly generous to mutexes and a little unfair to transactions — but rather to convey the general idea of how each of these concurrency techniques is used in practice.

For concurrency models which do not inherently involve queueing, queues will be represented as linked lists made of two-element arrays (in lieu of a built-in Pair class). So, for a list cell, the first element ([0]) will be the value of that list cell, and the second element ([1]) will be a reference to the next cell in the list.

Mutexes and Condition Variables (from Ruby stdlib)

require 'thread'

class MutexQueue
  def initialize
    @lock = Mutex.new
    @ready = ConditionVariable.new
    @head = nil
    @tail = nil
  end

  def put( obj )
    pair = [ obj, nil ]

    @lock.synchronize do
      tail = @tail
      if tail
        tail[1] = pair
        @tail = pair
      else
        @head = @tail = pair
        @ready.signal
      end
    end
  end

  def get
    @lock.synchronize do
      @ready.wait @lock until @head
      head = @head
      rest = head[1]
      if rest
        @head = rest
      else
        @head = @tail = nil
      end
      head[0]
    end
  end
end

This is how you have to do it today.

Futures (after Alice)

require 'concurrent/futures'

include Concurrent::Futures

class FutureQueue
  def initialize
    promise = Promise.new
    @tail = Ref.new promise
    @head = Ref.new promise.future
  end

  def put( obj )
    new_promise = Promise.new
    promise = @tail.exchange new_promise
    promise.fulfill [ obj, new_promise.future ]
  end

  def get
    @head.modify { |pair| pair[1] }[0]
  end
end

Futures act as placeholders and transparent proxies for a pending result. Trying to do anything with a future extracted from a Promise blocks until the promise is fulfilled, at which point the future pretends to be the object passed to Promise#fulfill.

Importantly, Ref#modify sets the value of a ref to a future for the result of the given block (which is passed the old value of the ref) and also returns the old value of the ref.

Software Transactional Memory (after Harris et al.)

require 'concurrent/stm'

include Concurrent::STM

class STMQueue
  def initialize
    @head = Ref.new nil
    @tail = Ref.new nil
  end

  def put( obj )
    pair = [ obj, nil ]

    atomic do
      tail = @tail.value
      if tail
        tail.next = pair
        @tail.value = pair
      else
        @head.value = @tail.value = pair
      end
    end
  end

  def get
    atomic do
      head = @head.value
      atomic_retry unless head
      rest = head[1]
      if rest
        @head.value = rest
      else
        @head.value = @tail.value = nil
      end
      head[0]
    end
  end
end

Note that atomic_retry blocks until another thread updates one of the transactional objects read anywhere up to the point where the transaction was retried (in this particular case, just @head). That prevents the transaction from actually retrying until something has changed which might allow it to make more progress than before.

Actors (after Scala)

require 'concurrent/actors'

include Concurrent::Actors

class ActorQueue < Actor
  Put = Message.new :obj
  Get = Message.new :origin

  def initialize
    super do |recurse|
      actor_receive Get do |get|
        actor_receive Put do |put|
          get.origin.actor_send( put )
          recurse.call
        end
      end
    end
  end

  def put( obj )
    actor_send( Put.new( obj ) )
  end

  def get( &cont )
    actor_send( Get.new( Actor.current ) )
    actor_receive( Put ) { |put| cont.call put.obj }
  end
end

Note that if this actor implementation used “heavyweight actors” (one thread per actor), or took advantage of Ruby’s callcc, the explicit continuation passing would be unnecessary; as it is, every actor_receive unrolls the stack back to the message loop, taking an explicit continuation as a block, just as in Scala.

It’s not uncommon for real actor code to wait on more than one type of message at a time, with a different continuation for each pattern. The API shown here accepts multiple arguments to actor_receive, but that still means an extra round of pattern matching in the body of the continuation.

Perhaps there’s a better way of doing actors in Ruby.

Joins (after )

require 'concurrent/joins'

include Concurrent::Joins

class JoinQueue < Join
  async :put
  sync :get, :arity => 0

  chord :get, :put do |obj|
    obj
  end
end

Yes, really.

The idea is that you define method bodies responding to particular combinations of methods — chords — which may be called from various threads. sync methods wait for a matching chord to be invoked. async methods are non-blocking, simply recording the fact that they were called. Calls to both method types will be queued if they go unmatched, hence queued calls to :put form the basis of our queue.

Since :get is synchronous, the result of the block becomes its return value. Both synchronous and asynchronous methods can contribute chord arguments, but since here :get has no arguments (arity zero), only :put contributes an argument to the block.

Conclusion

Notice that a common feature of all of these models is not only the ability to do something atomically, but also the ability to perform an (atomic) blocking wait for some condition to be satisfied. Both ingredients are necessary for a successful concurrency model.

Hopefully this gives you a basic idea of what it would be like to program with these concurrency models in Ruby. The 'concurrent/*' libraries don’t really exist yet, but maybe someday…

Addendum

As noted in the introduction, comparing the advantages and disadvantages of the various models soley on the basis of the examples above can be a bit misleading. Here’s a follow-on example which illustrates what I mean:

Problem:

Given the following routine:

def get_two( a, b )
  [ a.get, b.get ]
end

How can you modify the code to ensure that get_two will never consume a value from one queue without consuming a value from the other?

Solution for STM:

Rewrite get_two as follows:

def get_two( a, b )
  atomic { [ a.get, b.get ] }
end

Solution for the rest:

Exercise for the reader…