Concurrency: Five Ways
Here’s a look at five different concurrency models in Ruby, some whose Ruby
implementation is entirely hypothetical, and others which I have implemented
in some form or another in the past. We’ll be using a simple threadsafe
queue with two operations (get and put) as our example.
My purpose here is not to demonstrate their relative advantages or disadvantages—this particular example is overly generous to mutexes and a little unfair to transactions—but rather to convey the general idea of how each of these concurrency techniques is used in practice.
For concurrency models which do not inherently involve queueing, queues will
be represented as linked lists made of two-element arrays (in lieu of a
built-in Pair class). So, for a list cell, the first element ([0]) will
be the value of that list cell, and the second element ([1]) will be a
reference to the next cell in the list.
Mutexes and Condition Variables (from Ruby stdlib)
require 'thread'
class MutexQueue
def initialize
@lock = Mutex.new
@ready = ConditionVariable.new
@head = nil
@tail = nil
end
def put( obj )
pair = [ obj, nil ]
@lock.synchronize do
tail = @tail
if tail
tail[1] = pair
@tail = pair
else
@head = @tail = pair
@ready.signal
end
end
end
def get
@lock.synchronize do
@ready.wait @lock until @head
head = @head
rest = head[1]
if rest
@head = rest
else
@head = @tail = nil
end
head[0]
end
end
end
This is how you have to do it today.
Futures (after Alice)
require 'concurrent/futures'
include Concurrent::Futures
class FutureQueue
def initialize
promise = Promise.new
@tail = Ref.new promise
@head = Ref.new promise.future
end
def put( obj )
new_promise = Promise.new
promise = @tail.exchange new_promise
promise.fulfill [ obj, new_promise.future ]
end
def get
@head.modify { |pair| pair[1] }[0]
end
end
Futures act as placeholders and transparent proxies for a pending result.
Trying to do anything with a future extracted from a Promise blocks until
the promise is fulfilled, at which point the future pretends to be the
object passed to Promise#fulfill.
Importantly, Ref#modify sets the value of a ref to a future for the
result of the given block (which is passed the old value of the ref) and
also returns the old value of the ref.
Software Transactional Memory (after Harris et al.)
require 'concurrent/stm'
include Concurrent::STM
class STMQueue
def initialize
@head = Ref.new nil
@tail = Ref.new nil
end
def put( obj )
pair = [ obj, nil ]
atomic do
tail = @tail.value
if tail
tail.next = pair
@tail.value = pair
else
@head.value = @tail.value = pair
end
end
end
def get
atomic do
head = @head.value
atomic_retry unless head
rest = head[1]
if rest
@head.value = rest
else
@head.value = @tail.value = nil
end
head[0]
end
end
end
Note that atomic_retry blocks until another thread updates one of the
transactional objects read anywhere up to the point where the transaction
was retried (in this particular case, just @head). That
prevents the transaction from actually retrying until something has changed
which might allow it to make more progress than before.
Actors (after Scala)
require 'concurrent/actors'
include Concurrent::Actors
class ActorQueue < Actor
Put = Message.new :obj
Get = Message.new :origin
def initialize
super do |recurse|
actor_receive Get do |get|
actor_receive Put do |put|
get.origin.actor_send( put )
recurse.call
end
end
end
end
def put( obj )
actor_send( Put.new( obj ) )
end
def get( &cont )
actor_send( Get.new( Actor.current ) )
actor_receive( Put ) { |put| cont.call put.obj }
end
end
Note that if this actor implementation used “heavyweight actors” (one thread
per actor), or took advantage of Ruby’s callcc, the explicit
continuation passing would be unnecessary; as it is, every actor_receive
unrolls the stack back to the message loop, taking an explicit continuation
as a block, just as in Scala.
It’s not uncommon for real actor code to wait on more than one type of
message at a time, with a different continuation for each pattern. The API
shown here accepts multiple arguments to actor_receive, but that still
means an extra round of pattern matching in the body of the continuation.
Perhaps there’s a better way of doing actors in Ruby.
Joins (after Cω)
require 'concurrent/joins'
include Concurrent::Joins
class JoinQueue < Join
async :put
sync :get, :arity => 0
chord :get, :put do |obj|
obj
end
end
Yes, really.
The idea is that you define method bodies responding to particular
combinations of methods — chords — which may be called from
various threads. sync methods wait for a matching chord to be invoked.
async methods are non-blocking, simply recording the fact that they were
called. Calls to both method types will be queued if they go unmatched,
hence queued calls to :put form the basis of our queue.
Since :get is synchronous, the result of the block becomes its return
value. Both synchronous and asynchronous methods can contribute chord
arguments, but since here :get has no arguments (arity zero), only
:put contributes an argument to the block.
Conclusion
Notice that a common feature of all of these models is not only the ability to do something atomically, but also the ability to perform an (atomic) blocking wait for some condition to be satisfied. Both ingredients are necessary for a successful concurrency model.
Hopefully this gives you a basic idea of what it would be like to program
with these concurrency models in Ruby. The 'concurrent/*' libraries don’t
really exist yet, but maybe someday…
Addendum
As noted in the introduction, comparing the advantages and disadvantages of the various models soley on the basis of the examples above can be a bit misleading. Here’s a follow-on example which illustrates what I mean:
Problem:
Given the following routine:
def get_two( a, b )
[ a.get, b.get ]
end
How can you modify the code to ensure that get_two will never consume a
value from one queue without consuming a value from the other?
Solution for STM:
Rewrite get_two as follows:
def get_two( a, b )
atomic { [ a.get, b.get ] }
end
Solution for the rest:
Exercise for the reader…