The queue is one of the fundamental data structures of multithreaded programming. It is often used for message-passing between systems and/or threads, commonly for event handling, task pools, or other types of work queues. In multithreaded environments, it is extremely important that these queues are performant under high thread contention, and are thus lock-free.
Most lock-free queue designs internally use a linked-list of dynamically-allocated nodes to store their data. However, dynamic memory allocations are slow, as is pointer-hopping through memory from one node to the next. This series of blog posts will discuss how to write an efficient lock-free queue, utilizing a simple array as a circular buffer for fast data access. It will support multiple producer and consumer threads, as well as threads waiting for data to pop or room to push. Part I of this blog series will focus on the simplest case, a single-producer single-consumer (SPSC) lock-free queue.
Queue Class Policies and Members
We'll control the behavior of our Queue
class with two separate template variables: what type of threading to use (ThreadsPolicy
) and what types of waiting to support (WaitPolicy
). Their definitions and the Queue
class declaration are below, where NoWaits
is chosen as the default WaitPolicy
:
//POLICIES
//E.g. MPSC: Multiple Producers, Single Consumer
enum class ThreadsPolicy
{
SPSC = 0, SPMC, MPSC, MPMC
};
enum class WaitPolicy
{
NoWaits = 0, PushAwait, PopAwait, BothAwait
};
//CLASS DECLARATION
template <typename DataType, ThreadsPolicy Threading,
WaitPolicy Waiting = WaitPolicy::NoWaits>
class Queue;
The SPSC queue case is so much simpler than the others that even its class member variables are different. Thus we'll fully-specialize the templated Queue
class definition for the SPSC case. The class member variables needed are shown below:
template <typename DataType, WaitPolicy Waiting>
class Queue<DataType, ThreadsPolicy::SPSC, Waiting>
{
//...
private:
//STATIC MEMBERS
static constexpr auto sAlign = std::hardware_destructive_interference_size;
//OVER-ALIGNED MEMBERS
alignas(sAlign) std::atomic<int> mPushIndex;
alignas(sAlign) std::atomic<int> mPopIndex;
alignas(sAlign) std::atomic<int> mSize = 0;
alignas(sAlign) std::byte* mStorage = nullptr; //Object Memory
//DEFAULT-ALIGNED MEMBERS
//Not over-aligned as neither these nor the mStorage pointer change
int mCapacity = 0;
int mIndexEnd; //at this, we need to wrap indices around to zero
};
As we push/pop DataType
objects into/from the queue, we will create/destroy them in the mStorage
byte array. The mPushIndex
and mPopIndex
variables indicate where in the mStorage
array the next push or pop should occur. Both these and the mSize
variables are atomic because they need to be accessible by multiple threads.
Note that this container is not growable: if mStorage
fills up there is no room to emplace more entries into the queue. This is not necessarily a bad thing, as it forces us to be mindful of the amount of memory we are using, which is especially important on memory-constrained devices such as game consoles.
Also, since we're using a fixed-size array, growing the container would require locking around access to mStorage
. This can be a significant performance penalty when growth occurs, especially if DataType
is expensive to move, as it will block all other threads trying to access the queue.
Instead, it is best to allocate mStorage
to be large enough to handle any possible load we will throw at it. If this is too large or potentially unbounded, then it's probably best to use WaitPolicy::PushAwait
rather than blocking all threads.
Since this class is multithreaded, the member variables that are frequently modified are aligned to separate cache lines via std::hardware_destructive_interference_size
. This prevents false sharing between them: this way a write to one variable won't trigger unnecessary reloading of neighboring variables from memory.
Handling the push and pop indices though is trickier than you might think. If mPushIndex
and mPopIndex
were simply direct indices into mStorage
, then what would it mean if both of the indices were the same value? It could indicate that the container is empty (the pop index has caught up to the push index), or that the container is full (vice versa)! We can't simply check mSize
to determine which it is, as we may read an out-of-date value.
Instead, we'll let mPushIndex
and mPopIndex
increase beyond mCapacity
, and the corresponding index in mStorage
will be their value modulo mCapacity
. That way if they are the same then the container is empty, and if their difference is mCapacity
then it is full. Integers can't increase forever though, and mIndexEnd
is where we'll wrap their values around to zero.
Initialization and Cleanup
Initialization of our Queue
container is pretty simple; it mainly involves allocating an array of memory to use for our object storage. Note that this memory is allocated to (at least) the cache line size to prevent false sharing with whatever neighbors it in memory:
//Class template parameters removed for brevity
template <typename AllocatorType>
void Queue::Allocate(AllocatorType& aAllocator, int aCapacity)
{
Assert(!Is_Allocated(), "Can't allocate while still owning memory!\n");
Assert(aCapacity > 0, "Invalid capacity {}!\n", aCapacity);
//Allocate memory for object storage
auto cNumBytes = aCapacity * sizeof(DataType);
static constexpr auto sAlignment = std::max(sAlign, alignof(DataType));
mStorage = aAllocator.Allocate(cNumBytes, sAlignment);
Base::Assert(mStorage != nullptr, "Memory allocation failed!\n");
mCapacity = aCapacity;
//Calculate where index values will wrap-around to zero
static constexpr auto sMaxValue = std::numeric_limits<int>::max();
auto cMaxNumWrapArounds = sMaxValue / mCapacity;
Assert(cMaxNumWrapArounds >= 2, "Not enough wrap-arounds!\n");
mIndexEnd = mCapacity * cMaxNumWrapArounds;
}
bool Queue::Is_Allocated() const
{
return (mStorage != nullptr);
}
The Free()
method is also straightforward: it mainly just cleans up the Queue
by freeing the memory we allocated earlier:
//Class template parameters removed for brevity
template <typename AllocatorType>
void Queue::Free(AllocatorType& aAllocator)
{
Assert(Is_Allocated(), "No memory to free!\n");
Assert(empty(), "Can't free until empty!\n");
aAllocator.Free(mStorage);
mStorage = nullptr;
mCapacity = 0;
}
Emplacing and Popping - One Object
Emplacing and popping objects into an SPSC queue is relatively straightforward. When emplacing, the only multithreaded thing we have to be careful about is making sure that the queue isn't full. So all we have to do is load the current push/pop indices, and if we're not full, we emplace the object, bump mPushIndex
, and update the size:
//Class template parameters removed for brevity
template <typename... ArgumentTypes>
bool Queue::Emplace(ArgumentTypes&&... aArguments)
{
//Load indices
//Push load relaxed: Only this thread can modify it
auto cUnwrappedPushIndex = mPushIndex.load(std::memory_order::relaxed);
//Pop load acquire: Object creation cannot be reordered above this
auto cUnwrappedPopIndex = mPopIndex.load(std::memory_order::acquire);
//Guard against the container being full
auto cIndexDelta = cUnwrappedPushIndex - cUnwrappedPopIndex;
if((cIndexDelta == mCapacity) || (cIndexDelta == (mCapacity - mIndexEnd)))
return false; //Full. The second check handled wrap-around
//Emplace the object
auto cPushIndex = cUnwrappedPushIndex % mCapacity;
auto cAddress = mStorage + cPushIndex * sizeof(DataType);
new (cAddress) DataType(std::forward<ArgumentTypes>(aArguments)...);
//Advance push index
auto cNewPushIndex = Bump_Index(cUnwrappedPushIndex);
//Push store release: Object creation cannot be reordered below this
mPushIndex.store(cNewPushIndex, std::memory_order::release);
//Update the size
Increase_Size(1);
return true;
}
int Queue::Bump_Index(int aIndex) const
{
int cIncremented = aIndex + 1;
return (cIncremented < mIndexEnd) ? cIncremented : 0;
}
When checking if the container was full, note that we had to guard against mPushIndex
having reached mIndexEnd
and wrapping back around to zero. Also, note that the atomic memory orderings are such that the creation of the object in mStorage
cannot be reordered beyond these operations for safety. Finally, the details of Increase_Size()
are discussed later in this post.
Popping an object is very similar: we load the current push/pop indices, and if we're not empty, we move and destroy the stored object, bump mPopIndex
, and update the size:
//Class template parameters removed for brevity
bool Queue::Pop(DataType& aPopped)
{
//Load indices
//Push load acquire: The pop cannot be reordered above this
auto cUnwrappedPushIndex = mPushIndex.load(std::memory_order::acquire);
//Pop load relaxed: Only this thread can modify it
auto cUnwrappedPopIndex = mPopIndex.load(std::memory_order::relaxed);
//Guard against the container being empty
if(cUnwrappedPopIndex == cUnwrappedPushIndex)
return false; //The queue is empty
//Pop data
auto cPopIndex = cUnwrappedPopIndex % mCapacity;
auto cAddress = mStorage + cPopIndex * sizeof(DataType);
auto cData = std::launder(reinterpret_cast<DataType*>(cAddress));
aPopped = std::move(*cData);
cData->~DataType();
//Advance pop index
auto cNewPopIndex = Bump_Index(cUnwrappedPopIndex);
//Pop store release: The pop cannot be reordered below this
mPopIndex.store(cNewPopIndex, std::memory_order::release);
//Update the size
Decrease_Size(1);
return true;
}
To access an object, we have to first reinterpret its address from std::byte*
to DataType*
, then std::launder
the pointer. This laundering doesn't execute any code; it instead informs the compiler that it can't make any assumptions about where the object came from when doing optimizations. This is because we will likely create objects at this location many times throughout the lifetime of the Queue
.
Emplacing and Popping - Multiple Objects
If we want to emplace or pop multiple objects, calling the above methods multiple times is slow, resulting in a lot of unnecessary atomic operations. Instead, we can push/pop all of them at once using the same atomic operations needed for just one.
However, we now have to deal with wrapping around to the beginning of our circular array buffer. Otherwise, everything else is just about the same. To emplace:
//Class template parameters removed for brevity
template <typename InputType>
Span<InputType> Queue::Emplace_Multiple(const Span<InputType>& aSpan)
{
//Load indices
//Push load relaxed: Only this thread can modify it
auto cUnwrappedPushIndex = mPushIndex.load(std::memory_order::relaxed);
//Pop load acquire: Object creation cannot be reordered above this
auto cUnwrappedPopIndex = mPopIndex.load(std::memory_order::acquire);
//Can only push up to the pop index
auto cMaxPushIndex = cUnwrappedPopIndex + mCapacity;
auto cMaxSlotsAvailable = cMaxPushIndex - cUnwrappedPushIndex;
cMaxSlotsAvailable -= (cMaxSlotsAvailable >= mIndexEnd) ? mIndexEnd : 0;
const auto cSpanSize = aSpan.size();
auto cNumToPush = std::min(cSpanSize, cMaxSlotsAvailable);
if(cNumToPush == 0)
return aSpan; //The queue is full.
//Setup push
auto cPushIndex = cUnwrappedPushIndex % mCapacity;
auto cPushAddress = mStorage + (cPushIndex * sizeof(DataType));
auto cPushToData = std::launder(reinterpret_cast<DataType*>(cPushAddress));
auto cDistanceBeyondEnd = (cPushIndex + cNumToPush) - mCapacity;
const auto cSpanData = aSpan.data();
//Push data (if const input just copies, else moves)
if(cDistanceBeyondEnd <= 0)
std::uninitialized_move_n(cSpanData, cNumToPush, cPushToData);
else
{
auto cInitialLength = cNumToPush - cDistanceBeyondEnd;
std::uninitialized_move_n(cSpanData, cInitialLength, cPushData);
cPushToData = std::launder(reinterpret_cast<DataType*>(mStorage));
auto cToPush = cSpanData + cInitialLength;
std::uninitialized_move_n(cToPush, cDistanceBeyondEnd, cPushToData);
}
//Advance push index
auto cNewPushIndex = Increase_Index(cUnwrappedPushIndex, cNumToPush);
//Push store release: Object creation cannot be reordered below this
mPushIndex.store(cNewPushIndex, std::memory_order::release);
//Update the size
Increase_Size(cNumToPush);
//Return unfinished entries
auto cRemainingBegin = cSpanData + cNumToPush;
return Span<InputType>(cRemainingBegin, cSpanSize - cNumToPush);
}
int Queue::Increase_Index(int aIndex, int aIncrease) const
{
int cNewIndex = aIndex + aIncrease;
cNewIndex -= (cNewIndex >= mIndexEnd) ? mIndexEnd : 0;
return cNewIndex;
}
Here Span
is just a non-throwing replacement of std::span
. If the emplace wraps around to the beginning of mStorage
, then we break up the object emplacement into two steps. The atomic memory ordering above once again ensures that the object creation cannot be reordered beyond their operations for safety. Finally, Increase_Index()
ensures that once the indices reach mIndexEnd
that they wrap back around to zero.
Pop_Multiple()
is very similar, and is mainly shown below for completeness:
//Class template parameters removed for brevity
template <typename ContainerType>
void Queue::Pop_Multiple(ContainerType& aPopped)
{
//Load indices
//Push load acquire: The pop cannot be reordered above this
auto cUnwrappedPushIndex = mPushIndex.load(std::memory_order::acquire);
//Pop load relaxed: Only this thread can modify it
auto cUnwrappedPopIndex = mPopIndex.load(std::memory_order::relaxed);
//Can only pop up to the push index
auto cMaxSlotsAvailable = cUnwrappedPushIndex - cUnwrappedPopIndex;
cMaxSlotsAvailable += (cMaxSlotsAvailable < 0) ? mIndexEnd : 0;
auto cOutputSpaceAvailable = aPopped.capacity() - aPopped.size();
auto cNumToPop = std::min(cOutputSpaceAvailable, cMaxSlotsAvailable);
if(cNumToPop == 0)
return; //The queue is empty.
//Helper function for Pop/destroy
auto cPopAndDestroy = [&](DataType* aPopData, int aNumToPop)
{
//Pop data
aPopped.insert(std::end(aPopped), std::move_iterator(aPopData),
std::move_iterator(aPopData + aNumToPop));
//Destroy old data
std::destroy_n(aPopData, aNumToPop);
};
//Setup pop/destroy
auto cPopIndex = cUnwrappedPopIndex % mCapacity;
auto cPopAddress = mStorage + (cPopIndex * sizeof(DataType));
auto cPopFromData = std::launder(reinterpret_cast<DataType*>(cPopAddress));
auto cDistanceBeyondEnd = (cPopIndex + cNumToPop) - mCapacity;
//Push data (if const input just copies, else moves)
if(cDistanceBeyondEnd <= 0)
cPopAndDestroy(cPopFromData, cNumToPop);
else
{
auto cInitialLength = cNumToPop - cDistanceBeyondEnd;
cPopAndDestroy(cPopFromData, cInitialLength);
cPopFromData = std::launder(reinterpret_cast<DataType*>(mStorage));
cPopAndDestroy(cPopFromData, cDistanceBeyondEnd);
}
//Advance pop index
auto cNewPopIndex = Increase_Index(cUnwrappedPopIndex, cNumToPop);
//Pop store release: The pop cannot be reordered below this
mPopIndex.store(cNewPopIndex, std::memory_order::release);
//Update the size
Decrease_Size(cNumToPop);
}
Emplace Awaiting
We may want to wait to pop if the queue is empty (e.g. a task thread waiting for work to do), and/or to wait to emplace if the queue is full (so that we don't lose any messages). The best way to do that is for these threads to wait for changes to the mSize
variable. This way we don't need to include any additional class members.
We will however include some additional static member variables (as well as some utility functions) that will help us implement our waiting:
//UTILITIES
constexpr bool Await_Pushes(WaitPolicy aWaiting)
{
return (aWaiting == WaitPolicy::PushAwait) ||
(aWaiting == WaitPolicy::BothAwait);
}
constexpr bool Await_Pops(WaitPolicy aWaiting)
{
return (aWaiting == WaitPolicy::PopAwait) ||
(aWaiting == WaitPolicy::BothAwait);
}
template <typename DataType, WaitPolicy Waiting>
class Queue<DataType, ThreadsPolicy::SPSC, Waiting>
{
//...
private:
static constexpr bool sPushAwait = Await_Pushes(Waiting);
static constexpr bool sPopAwait = Await_Pops(Waiting);
static constexpr int sSizeMask = 0x80000000; //ASSUMES 32 BIT int!
}
We'll use the sPushAwait
and sPopAwait
variables primarily to control the interface of the queue (using concepts). The sSizeMask
variable is needed to assist with ending waits during shutdown, as detailed further below.
The logic for waiting when emplacing is rather simple: each time we fail to emplace, we wait if and until mSize
is not mCapacity
(i.e. until the container is no longer full). If mSize
is not mCapacity
right away (e.g. we load an old value), then we'll just try again until we do. We use std::memory_order::acquire
on mSize
for the wait to ensure that when we do unblock, we will see the updated value of mPopIndex
so that our emplace is more likely to succeed.
//Class template parameters removed for brevity
template <typename... ArgumentTypes>
void Queue::Emplace_Await(ArgumentTypes&&... aArguments) requires (sPushAwait)
{
//Acquire: Need sync to see the latest queue indices
while(!Emplace(std::forward<ArgumentTypes>(aArguments)...))
mSize.wait(mCapacity, std::memory_order::acquire);
}
template <typename InputType>
void Queue::Emplace_Multiple_Await(Span<InputType> aSpan) requires (sPushAwait)
{
while(true)
{
aSpan = Emplace_Multiple(aSpan);
if(aSpan.empty())
return;
//Acquire: Need sync to see the latest queue indices
mSize.wait(mCapacity, std::memory_order::acquire);
}
}
Pop Awaiting
Waiting to pop would be similar, except when we're shutting down the engine we need some way to tell these threads to stop waiting. The popping threads will also wait on mSize
, so during shutdown we need to change mSize
to wake them up. However, we also want to retain our knowledge of how many items are in the queue, so we can't just change mSize
arbitrarily.
So when we want to end pop-awaiting, we'll set the highest bit of mSize
, which will allow the popping threads to unblock, while still being able to extract the correct value of the size. Below is the code for ending and resetting pop-waiting, as well as extracting the true size of the queue:
//Class template parameters removed for brevity
void Queue::End_PopWaiting() requires(sPopAwait)
{
//Tell all popping threads we're shutting down
//Release order: Syncs indices, and prevents code reordering after this.
auto cPriorSize = mSize.fetch_or(sSizeMask, std::memory_order::release);
//Notify any waiting threads, but only if it was empty
if(cPriorSize == 0)
mSize.notify_all();
}
void Queue::Reset_PopWaiting() requires(sPopAwait)
{
//Relaxed: Sync of other data is not needed: Queue state unchanged
mSize.fetch_and(~sSizeMask, std::memory_order::relaxed);
}
int Queue:size() const
{
//Relaxed: Nothing to synchronize when reading this
auto cSize = mSize.load(std::memory_order::relaxed);
return cSize & (~sSizeMask); //Clear the high bit!
}
In End_PopWaiting()
, if the queue was empty we notify_all()
threads waiting on mSize
to wake them up. We use std::memory_order::release
on the mSize
update so that we'll have an acquire-release pair with the acquire
ordering that we'll have in pop-await functions below. This pairing means that any changes made (in the End_PopWaiting()
thread) prior to setting mSize
will be synchronized-with the observation of this change of mSize
in the pop-await functions. That way any waking threads that see that we've ended the waiting will also see any other changes that we've made.
The logic for pop-awaiting is identical to that for emplace-awaiting, except if after waiting mSize
is sSizeMask
, we know that not only do we not want to wait anymore, but also that the queue is currently empty and we should just return:
//Class template parameters removed for brevity
bool Queue::Pop_Await(DataType& aPopped) requires (sPopAwait)
{
while(true)
{
if(Pop(aPopped))
return true;
//The queue was empty, wait until someone pushes or we're ending
//Acquire: Need sync to see the latest queue indices
mSize.wait(0, std::memory_order::acquire);
//If mSize is sSizeMask then nothing will push, and none left to pop.
//Relaxed: Nothing to sync, and if we miss sSizeMask we'll see it soon
if(mSize.load(std::memory_order::relaxed) == sSizeMask)
return false;
}
}
template <typename ContainerType>
void Queue::Pop_Multiple_Await(ContainerType& aPopped) requires (sPopAwait)
{
while(true)
{
Pop_Multiple(aPopped);
if(!aPopped.empty())
return;
//Comments are identical to Pop_Await()
mSize.wait(0, std::memory_order::relaxed);
//If mSize is sSizeMask then nothing will push, and none left to pop.
//Relaxed: Nothing to sync, and if we miss sSizeMask we'll see it soon
if(mSize.load(std::memory_order::relaxed) == sSizeMask)
return;
}
}
Finally, the methods for increasing and decreasing mSize
:
//Class template parameters removed for brevity
void Queue::Increase_Size(int aNumPushed)
{
//Release if pop-awaiting (Syncs indices), else relaxed (no sync needed)
static constexpr auto sOrder = sPopAwait ? std::memory_order::release :
std::memory_order::relaxed;
[[maybe_unused]] auto cPriorSize = mSize.fetch_add(aNumPushed, sOrder);
if constexpr (sPopAwait)
{
//If was empty, notify all threads
//No need to clear high bit: if set, pop-waits already ended
if (cPriorSize == 0)
mSize.notify_all();
}
}
void Queue::Decrease_Size(int aNumPopped)
{
//Release if push-awaiting (Syncs indices), else relaxed (no sync needed)
static constexpr auto sOrder = sPushAwait ? std::memory_order::release :
std::memory_order::relaxed;
[[maybe_unused]] auto cPriorSize = mSize.fetch_sub(aNumPopped, sOrder);
if constexpr (sPushAwait)
{
//If was full (clear the high bit!), notify all threads
if ((cSize & (~sSizeMask)) == mCapacity)
mSize.notify_all();
}
}
Note that if we don't allow waiting then the memory orders are relaxed, as there's no data we need to worry about synchronizing between threads. Otherwise, we use std::memory_order::release
to create acquire-release pairs with the acquire
's in the emplace-await and pop-await functions. That way when the threads unblock, they will see the updates to the queue and will succeed in their emplacing/popping.
Conclusion
Single-producer, single-consumer queues are very useful in multithreaded programs, and are commonly used for message passing between dedicated threads. For example, submitting file-save/load requests from the main game thread to an IO-thread, submitting network packet requests to a networking thread, audio requests to an audio thread, etc.
Depending on your engine architecture though, you might need queues that support multiple producer and consumer threads, such as for an async task queue. These types of queues will be covered in Part II of this blog series.
I should note that while I have written this code for an abstract machine, I have only tested this code on x86. I have not been able to test this yet on an ARM processor, which has a more relaxed memory model. Thus there may be subtle problems with the chosen atomic memory orders that I haven't found yet. If you're targeting a platform using ARM (e.g. Android, iOS), review the memory ordering carefully and please let me know if you encounter any problems!