Spin Buffers Done "Right"
Just so I can get this out of my system once and for all, I figured I may as well show how one could safely implement a spin buffer as described in the DDJ article (minus the lack of synchronization).
First of all, here’s the original implementation of SpinBuffer from
the article:
[I’m going to have to apologize in advance for the lack of code indention in the RSS and Atom feeds—it’s the result of a bizzare Hobix (or REXML?) bug I’ve yet to track down; until it’s fixed you’ll need to read the original post to see the correct formatting.]
public class SpinBuffer {
private static final int MAX_SIZE = 1000000;
private Object[][] m_bfr = new Object[3][MAX_SIZE];
private boolean[] m_busy = new boolean[3];
private int[] m_count = new int[3];
private int[] m_ptr = new int[3];
private int m_pBuf = 0;
private int m_cBuf = 1;
/** Creates a new instance of SpinBuffer */
public SpinBuffer() {
m_busy[0] = m_busy[1] = true;
m_busy[2] = false;
for ( int i=0; i<3; i++ ) {
m_ptr[i] = m_count[i] = 0;
}
}
public boolean put(Object o) {
int next = (m_pBuf+1)%3;
if ( m_ptr[m_pBuf] < MAX_SIZE ) {
// add to the buffer
m_bfr[m_pBuf][m_ptr[m_pBuf]] = o;
m_ptr[m_pBuf]++;
}
else
return false;
// check if next buffer is free
if ( !m_busy[next] ) {
m_count[m_pBuf] = m_ptr[m_pBuf];
m_ptr[m_pBuf] = 0;
m_busy[next] = true; // acquire
m_busy[m_pBuf] = false; // release
m_pBuf = next;
}
return true;
}
public Object get() {
Object o = null;
if ( m_ptr[m_cBuf] < m_count[m_cBuf]) {
o = m_bfr[m_cBuf][m_ptr[m_cBuf]];
// remove the reference
m_bfr[m_cBuf][m_ptr[m_cBuf]] = null;
m_ptr[m_cBuf]++;
}
else {
// check if next buffer is free
int next = (m_cBuf+1)%3;
if ( !m_busy[next] ) {
m_busy[next] = true; // acquire
m_ptr[m_cBuf] = 0;
m_count[m_cBuf] = 0;
m_busy[m_cBuf] = false; // release
m_cBuf = next;
}
//else, waiting for consumer
}
return o;
}
}
The idea is that we have three buffers: at any given time, one buffer is being written to, one buffer is being read from, and the remaining buffer is either full and waiting for the reader to consume it, or empty and waiting for the writer to populate it. The reader and writer work around the ring of three buffers in one direction, as the role of “free” buffer migrates around in the opposite direction, providing “give” so that the reader and writer don’t have to work exactly in lock-step.
In this scheme little synchronization is required since the reader and writer are never using the same buffer at the same time. Unfortunately, little synchronization is not the same as none, and the lack of care for synchronization in the code above means that its behavior simply isn’t guaranteed by the Java Memory Model.
The spin buffer concept is possible to implement “properly” with sufficient
attention to synchronization, however. It’s a weird data structure with
annoying properties, so I’d still recommend using something like
ConcurrentLinkedQueue instead, but…
NOTE: I ran out of time to work on this tonight, so what follows is a
draft which I haven’t compiled or tested yet. I’ll post any needed
corrections later, but I won’t mind if you leave me a comment when you
spot something wrong.
/**
* A class implementing a (fast?) non-blocking queue for communication
* between one producer and one consumer thread. Internally, it
* employs the exchange of fixed-capacity buffers; when the producer
* fills one buffer to capacity, it is exchanged with the consumer
* thread for a fresh buffer. Similarly, when the consumer empties a
* buffer, it is returned to the producer thread in exchange for a
* newly populated buffer.
*
* Since the queue is non-blocking, it is necessary for both the
* producer and consumer to poll.
*/
public class SpinBuffer {
private Gate gate;
private Buffer consumer_buffer;
private Buffer producer_buffer;
/**
* Initializes a "spin buffer" with internal buffers of the
* given capacity; the capacity determines how many objects can be
* added to the queue by the producer before they are available
* to the consumer.
*/
SpinBuffer(int capacity) {
if ( capacity < 1 ) {
throw new IllegalArgumentException();
}
gate = new Gate(new Buffer(capacity));
consumer_buffer = new Buffer(capacity);
producer_buffer = new Buffer(capacity);
}
/**
* Adds an object to the queue, exchanging buffers with the consumer
* if the current buffer is filled. Returns true if the object was
* successfully added, or false if the current buffer was filled and
* the consumer wasn't ready to receive it yet.
*
* The object cannot be null.
*
* Should be called only by the single producer thread.
*/
public boolean put(Object o) {
if ( o == null ) {
throw new NullPointerException();
}
boolean success = producer_buffer.put(o);
if (!success) {
Buffer next = gate.producerSwap(producer_buffer);
if ( next != null ) {
producer_buffer = next;
success = producer_buffer.put(o);
assert success;
}
}
return success;
}
/**
* Tries to force an immediate exchange with the consumer, so long
* as the current buffer is not empty. Returns true if the exchange
* succeeded or the buffer was empty, false otherwise.
*
* This is useful when, for example, the producer has no more objects
* to provide. As there's no other way for the consumer to
* distinguish between the producer having an unfilled buffer and
* there simply being no more data, end-of-data should be indicated
* in-band by a special object. Signalling end-of-data out-of-band
* requires additional synchronization.
*
* I think this is a better solution than the original article's idea
* of requiring the client to write MAX_SIZE nulls.
*
* Should be called only by the single producer thread.
*/
public boolean flush() {
boolean flushed = false;
if (!producer_buffer.isEmpty()) {
Buffer next = gate.producerSwap(producer_buffer);
if ( next != null ) {
producer_buffer = next;
flushed = true;
}
} else {
flushed = true;
}
return flushed;
}
/**
* Retrieves an object from the queue, or null if one is not yet
* available (e.g. because the producer is still in the process of
* populating a new buffer).
*
* Should be called only by the single consumer thread.
*/
public Object get() {
Object o;
o = consumer_buffer.get();
if ( o == null ) {
Buffer next = gate.consumerSwap(consumer_buffer);
if ( next != null ) {
consumer_buffer = next;
o = consumer_buffer.get();
assert o != null;
}
}
return o;
}
/**
* This class implements a fixed-capacity buffer which accepts up to
* a certain number of objects (its capacity) via the put method,
* after which it should be emptied via the get method to reset it
* before more objects can be added.
*
* I know this is weird. Maybe I should have just used a ring
* buffer...
*/
static private class Buffer {
private Object[] contents;
private int head = 0;
private int tail = 0;
Buffer(int capacity) {
contents = new Object[capacity];
}
/**
* Adds an object at the head of the buffer and advances the head,
* returning true if successful or false if the object was not
* added because there was no more room to advance the head.
*/
public boolean put(Object o) {
assert o != null;
assert tail == 0;
if ( head < contents.length ) {
contents[head++] = o;
return true;
} else {
return false;
}
}
/**
* Removes and returns the object at the tail of the buffer,
* advancing the tail. Returns null if there are no more objects
* in the buffer, at which time it will be ready to accept
* more.
*/
public Object get() {
if ( tail < head ) {
Object o = contents[tail];
contents[tail] = null; // don't litter
tail++;
return o;
} else {
head = tail = 0;
return null;
}
}
public boolean isEmpty() {
return head == tail;
}
public boolean isReady() {
return head == 0;
}
}
/**
* A Gate object provides a safe way for the producer and consumer
* threads to exchange buffers; each thread passes in the buffer
* it's finished with and gets a fresh buffer in return; the
* exchange is made only if a fresh buffer has been made available
* by the other thread.
*
* This is the only part of the code where the two threads
* communicate. Isolating that code here makes correctness easier
* to reason about; it can be modeled by a simple state machine
* whose initial state is (null, consumed):
*
* (null, consumed) -> producerSwap -> (populated, null)
* (null, consumed) -> consumerSwap -> (null, consumed)
* (populated, null) -> consumerSwap -> (null, consumed)
* (populated, null) -> producerSwap -> (populated, null)
*
* Since consumer_next is volatile, the consumer thread's read of
* consumer_next ensures that the producer's writes up to the
* point where the producer wrote to consumer_next happen before
* the consumer's read.
*
* Similarly, the producer thread's read of producer_next ensures
* that the consumer's writes up to the point where the consumer
* wrote to producer_next happen before the producer's read.
*
* Note that this doesn't work under the original version of the
* Java Memory Model (prior to Java 5), since it underspecifies
* volatile.
*/
static private class Gate {
// note that these are volatile!
volatile private Buffer consumer_next;
volatile private Buffer producer_next;
Gate(Buffer initial) {
assert initial != null;
// prime with a buffer for the producer to take
producer_next = initial;
}
/**
* Called by the producer thread to exchange buffers; receives the
* buffer the producer has populated and returns a new, empty
* buffer to populate if one is available. Otherwise, does
* nothing and returns null.
*/
public Buffer producerSwap(Buffer populated) {
assert !populated.isEmpty();
Buffer next = producer_next;
if ( next != null ) {
// note the order of these assignments!
producer_next = null;
consumer_next = populated;
}
return next;
}
/**
* Called by the consumer thread to exchange buffers; receives
* the buffer the consumer has consumed and returns a buffer
* with more objects to consume if one is available. Otherwise,
* does nothing and returns null.
*/
public Buffer consumerSwap(Buffer consumed) {
assert consumed.isReady();
Buffer next = consumer_next;
if ( next != null ) {
// note the order of these assignments!
consumer_next = null;
producer_next = consumed;
}
return next;
}
}
}