C++ Lock-Free Queue - Part I

C++ Lock-Free Queue - Part I

The queue is one of the fundamental data structures of multithreaded programming. It is often used for message-passing between systems and/or threads, commonly for event handling, task pools, or other types of work queues. In multithreaded environments, it is extremely important that these queues are performant under high thread contention, and are thus lock-free.

Most lock-free queue designs internally use a linked-list of dynamically-allocated nodes to store their data. However, dynamic memory allocations are slow, as is pointer-hopping through memory from one node to the next. This series of blog posts will discuss how to write an efficient lock-free queue, utilizing a simple array as a circular buffer for fast data access. It will support multiple producer and consumer threads, as well as threads waiting for data to pop or room to push. Part I of this blog series will focus on the simplest case, a single-producer single-consumer (SPSC) lock-free queue.

Queue Class Policies and Members

We'll control the behavior of our Queue class with two separate template variables: what type of threading to use (ThreadsPolicy) and what types of waiting to support (WaitPolicy). Their definitions and the Queue class declaration are below, where NoWaits is chosen as the default WaitPolicy:

//POLICIES
//E.g. MPSC: Multiple Producers, Single Consumer
enum class ThreadsPolicy
{
    SPSC = 0, SPMC, MPSC, MPMC
};

enum class WaitPolicy
{
    NoWaits = 0, PushAwait, PopAwait, BothAwait
};

//CLASS DECLARATION
template <typename DataType, ThreadsPolicy Threading, 
    WaitPolicy Waiting = WaitPolicy::NoWaits>
class Queue;

The SPSC queue case is so much simpler than the others that even its class member variables are different. Thus we'll fully-specialize the templated Queue class definition for the SPSC case. The class member variables needed are shown below:

template <typename DataType, WaitPolicy Waiting>
class Queue<DataType, ThreadsPolicy::SPSC, Waiting>
{
    //...
private:

    //STATIC MEMBERS
    static constexpr auto sAlign = std::hardware_destructive_interference_size;

    //OVER-ALIGNED MEMBERS
    alignas(sAlign) std::atomic<int> mPushIndex;
    alignas(sAlign) std::atomic<int> mPopIndex;
    alignas(sAlign) std::atomic<int> mSize = 0;
    alignas(sAlign) std::byte* mStorage = nullptr; //Object Memory

    //DEFAULT-ALIGNED MEMBERS
    //Not over-aligned as neither these nor the mStorage pointer change
    int mCapacity = 0;
    int mIndexEnd; //at this, we need to wrap indices around to zero
};

As we push/pop DataType objects into/from the queue, we will create/destroy them in the mStorage byte array. The mPushIndex and mPopIndex variables indicate where in the mStorage array the next push or pop should occur. Both these and the mSize variables are atomic because they need to be accessible by multiple threads.

Note that this container is not growable: if mStorage fills up there is no room to emplace more entries into the queue. This is not necessarily a bad thing, as it forces us to be mindful of the amount of memory we are using, which is especially important on memory-constrained devices such as game consoles.

Also, since we're using a fixed-size array, growing the container would require locking around access to mStorage. This can be a significant performance penalty when growth occurs, especially if DataType is expensive to move, as it will block all other threads trying to access the queue.

Instead, it is best to allocate mStorage to be large enough to handle any possible load we will throw at it. If this is too large or potentially unbounded, then it's probably best to use WaitPolicy::PushAwait rather than blocking all threads.

Since this class is multithreaded, the member variables that are frequently modified are aligned to separate cache lines via std::hardware_destructive_interference_size. This prevents false sharing between them: this way a write to one variable won't trigger unnecessary reloading of neighboring variables from memory.

Handling the push and pop indices though is trickier than you might think. If mPushIndex and mPopIndex were simply direct indices into mStorage, then what would it mean if both of the indices were the same value? It could indicate that the container is empty (the pop index has caught up to the push index), or that the container is full (vice versa)! We can't simply check mSize to determine which it is, as we may read an out-of-date value.

Instead, we'll let mPushIndex and mPopIndex increase beyond mCapacity, and the corresponding index in mStorage will be their value modulo mCapacity. That way if they are the same then the container is empty, and if their difference is mCapacity then it is full. Integers can't increase forever though, and mIndexEnd is where we'll wrap their values around to zero.

Initialization and Cleanup

Initialization of our Queue container is pretty simple; it mainly involves allocating an array of memory to use for our object storage. Note that this memory is allocated to (at least) the cache line size to prevent false sharing with whatever neighbors it in memory:

//Class template parameters removed for brevity
template <typename AllocatorType>
void Queue::Allocate(AllocatorType& aAllocator, int aCapacity)
{
    Assert(!Is_Allocated(), "Can't allocate while still owning memory!\n");
    Assert(aCapacity > 0, "Invalid capacity {}!\n", aCapacity);

    //Allocate memory for object storage
    auto cNumBytes = aCapacity * sizeof(DataType);
    static constexpr auto sAlignment = std::max(sAlign, alignof(DataType));
    mStorage = aAllocator.Allocate(cNumBytes, sAlignment);
    Base::Assert(mStorage != nullptr, "Memory allocation failed!\n");
    mCapacity = aCapacity;

    //Calculate where index values will wrap-around to zero
    static constexpr auto sMaxValue = std::numeric_limits<int>::max();
    auto cMaxNumWrapArounds = sMaxValue / mCapacity;
    Assert(cMaxNumWrapArounds >= 2, "Not enough wrap-arounds!\n");
    mIndexEnd = mCapacity * cMaxNumWrapArounds;
}

bool Queue::Is_Allocated() const
{
    return (mStorage != nullptr);
}

The Free() method is also straightforward: it mainly just cleans up the Queue by freeing the memory we allocated earlier:

//Class template parameters removed for brevity
template <typename AllocatorType>
void Queue::Free(AllocatorType& aAllocator)
{
    Assert(Is_Allocated(), "No memory to free!\n");
    Assert(empty(), "Can't free until empty!\n");

    aAllocator.Free(mStorage);
    mStorage = nullptr;
    mCapacity = 0;
}

Emplacing and Popping - One Object

Emplacing and popping objects into an SPSC queue is relatively straightforward. When emplacing, the only multithreaded thing we have to be careful about is making sure that the queue isn't full. So all we have to do is load the current push/pop indices, and if we're not full, we emplace the object, bump mPushIndex, and update the size:

//Class template parameters removed for brevity
template <typename... ArgumentTypes>
bool Queue::Emplace(ArgumentTypes&&... aArguments)
{
    //Load indices
    //Push load relaxed: Only this thread can modify it
    auto cUnwrappedPushIndex = mPushIndex.load(std::memory_order::relaxed);
    //Pop load acquire: Object creation cannot be reordered above this
    auto cUnwrappedPopIndex = mPopIndex.load(std::memory_order::acquire);

    //Guard against the container being full
    auto cIndexDelta = cUnwrappedPushIndex - cUnwrappedPopIndex;
    if((cIndexDelta == mCapacity) || (cIndexDelta == (mCapacity - mIndexEnd)))
        return false; //Full. The second check handled wrap-around

    //Emplace the object
    auto cPushIndex = cUnwrappedPushIndex % mCapacity;
    auto cAddress = mStorage + cPushIndex * sizeof(DataType);
    new (cAddress) DataType(std::forward<ArgumentTypes>(aArguments)...);

    //Advance push index
    auto cNewPushIndex = Bump_Index(cUnwrappedPushIndex);
    //Push store release: Object creation cannot be reordered below this
    mPushIndex.store(cNewPushIndex, std::memory_order::release);

    //Update the size
    Increase_Size(1);
    return true;
}

int Queue::Bump_Index(int aIndex) const
{
    int cIncremented = aIndex + 1;
    return (cIncremented < mIndexEnd) ? cIncremented : 0;
}

When checking if the container was full, note that we had to guard against mPushIndex having reached mIndexEnd and wrapping back around to zero. Also, note that the atomic memory orderings are such that the creation of the object in mStorage cannot be reordered beyond these operations for safety. Finally, the details of Increase_Size() are discussed later in this post.

Popping an object is very similar: we load the current push/pop indices, and if we're not empty, we move and destroy the stored object, bump mPopIndex, and update the size:

//Class template parameters removed for brevity
bool Queue::Pop(DataType& aPopped)
{
    //Load indices
    //Push load acquire: The pop cannot be reordered above this
    auto cUnwrappedPushIndex = mPushIndex.load(std::memory_order::acquire);
    //Pop load relaxed: Only this thread can modify it
    auto cUnwrappedPopIndex = mPopIndex.load(std::memory_order::relaxed);

    //Guard against the container being empty
    if(cUnwrappedPopIndex == cUnwrappedPushIndex)
        return false; //The queue is empty

    //Pop data
    auto cPopIndex = cUnwrappedPopIndex % mCapacity;
    auto cAddress = mStorage + cPopIndex * sizeof(DataType);
    auto cData = std::launder(reinterpret_cast<DataType*>(cAddress));
    aPopped = std::move(*cData);
    cData->~DataType();

    //Advance pop index
    auto cNewPopIndex = Bump_Index(cUnwrappedPopIndex);
    //Pop store release: The pop cannot be reordered below this
    mPopIndex.store(cNewPopIndex, std::memory_order::release);

    //Update the size
    Decrease_Size(1);
    return true;
}

To access an object, we have to first reinterpret its address from std::byte* to DataType*, then std::launder the pointer. This laundering doesn't execute any code; it instead informs the compiler that it can't make any assumptions about where the object came from when doing optimizations. This is because we will likely create objects at this location many times throughout the lifetime of the Queue.

Emplacing and Popping - Multiple Objects

If we want to emplace or pop multiple objects, calling the above methods multiple times is slow, resulting in a lot of unnecessary atomic operations. Instead, we can push/pop all of them at once using the same atomic operations needed for just one.

However, we now have to deal with wrapping around to the beginning of our circular array buffer. Otherwise, everything else is just about the same. To emplace:

//Class template parameters removed for brevity
template <typename InputType>
Span<InputType> Queue::Emplace_Multiple(const Span<InputType>& aSpan)
{
    //Load indices
    //Push load relaxed: Only this thread can modify it
    auto cUnwrappedPushIndex = mPushIndex.load(std::memory_order::relaxed);
    //Pop load acquire: Object creation cannot be reordered above this
    auto cUnwrappedPopIndex = mPopIndex.load(std::memory_order::acquire);

    //Can only push up to the pop index
    auto cMaxPushIndex = cUnwrappedPopIndex + mCapacity;
    auto cMaxSlotsAvailable = cMaxPushIndex - cUnwrappedPushIndex;
    cMaxSlotsAvailable -= (cMaxSlotsAvailable >= mIndexEnd) ? mIndexEnd : 0;
    const auto cSpanSize = aSpan.size();
    auto cNumToPush = std::min(cSpanSize, cMaxSlotsAvailable);
    if(cNumToPush == 0)
        return aSpan; //The queue is full.

    //Setup push
    auto cPushIndex = cUnwrappedPushIndex % mCapacity;
    auto cPushAddress = mStorage + (cPushIndex * sizeof(DataType));
    auto cPushToData = std::launder(reinterpret_cast<DataType*>(cPushAddress));
    auto cDistanceBeyondEnd = (cPushIndex + cNumToPush) - mCapacity;
    const auto cSpanData = aSpan.data();

    //Push data (if const input just copies, else moves)
    if(cDistanceBeyondEnd <= 0)
        std::uninitialized_move_n(cSpanData, cNumToPush, cPushToData);
    else
    {
        auto cInitialLength = cNumToPush - cDistanceBeyondEnd;
        std::uninitialized_move_n(cSpanData, cInitialLength, cPushData);

        cPushToData = std::launder(reinterpret_cast<DataType*>(mStorage));
        auto cToPush = cSpanData + cInitialLength;
        std::uninitialized_move_n(cToPush, cDistanceBeyondEnd, cPushToData);
    }

    //Advance push index
    auto cNewPushIndex = Increase_Index(cUnwrappedPushIndex, cNumToPush);
    //Push store release: Object creation cannot be reordered below this
    mPushIndex.store(cNewPushIndex, std::memory_order::release);

    //Update the size
    Increase_Size(cNumToPush);

    //Return unfinished entries
    auto cRemainingBegin = cSpanData + cNumToPush;
    return Span<InputType>(cRemainingBegin, cSpanSize - cNumToPush);
}

int Queue::Increase_Index(int aIndex, int aIncrease) const
{
    int cNewIndex = aIndex + aIncrease;
    cNewIndex -= (cNewIndex >= mIndexEnd) ? mIndexEnd : 0;
    return cNewIndex;
}

Here Span is just a non-throwing replacement of std::span. If the emplace wraps around to the beginning of mStorage, then we break up the object emplacement into two steps. The atomic memory ordering above once again ensures that the object creation cannot be reordered beyond their operations for safety. Finally, Increase_Index() ensures that once the indices reach mIndexEnd that they wrap back around to zero.

Pop_Multiple() is very similar, and is mainly shown below for completeness:

//Class template parameters removed for brevity
template <typename ContainerType>
void Queue::Pop_Multiple(ContainerType& aPopped)
{
    //Load indices
    //Push load acquire: The pop cannot be reordered above this
    auto cUnwrappedPushIndex = mPushIndex.load(std::memory_order::acquire);
    //Pop load relaxed: Only this thread can modify it
    auto cUnwrappedPopIndex = mPopIndex.load(std::memory_order::relaxed);

    //Can only pop up to the push index
    auto cMaxSlotsAvailable = cUnwrappedPushIndex - cUnwrappedPopIndex;
    cMaxSlotsAvailable += (cMaxSlotsAvailable < 0) ? mIndexEnd : 0;
    auto cOutputSpaceAvailable = aPopped.capacity() - aPopped.size();
    auto cNumToPop = std::min(cOutputSpaceAvailable, cMaxSlotsAvailable);
    if(cNumToPop == 0)
        return; //The queue is empty.

    //Helper function for Pop/destroy
    auto cPopAndDestroy = [&](DataType* aPopData, int aNumToPop)
    {
        //Pop data
        aPopped.insert(std::end(aPopped), std::move_iterator(aPopData), 
            std::move_iterator(aPopData + aNumToPop));

        //Destroy old data
        std::destroy_n(aPopData, aNumToPop);
    };

    //Setup pop/destroy
    auto cPopIndex = cUnwrappedPopIndex % mCapacity;
    auto cPopAddress = mStorage + (cPopIndex * sizeof(DataType));
    auto cPopFromData = std::launder(reinterpret_cast<DataType*>(cPopAddress));
    auto cDistanceBeyondEnd = (cPopIndex + cNumToPop) - mCapacity;

    //Push data (if const input just copies, else moves)
    if(cDistanceBeyondEnd <= 0)
        cPopAndDestroy(cPopFromData, cNumToPop);
    else
    {
        auto cInitialLength = cNumToPop - cDistanceBeyondEnd;
        cPopAndDestroy(cPopFromData, cInitialLength);

        cPopFromData = std::launder(reinterpret_cast<DataType*>(mStorage));
        cPopAndDestroy(cPopFromData, cDistanceBeyondEnd);
    }

    //Advance pop index
    auto cNewPopIndex = Increase_Index(cUnwrappedPopIndex, cNumToPop);
    //Pop store release: The pop cannot be reordered below this
    mPopIndex.store(cNewPopIndex, std::memory_order::release);

    //Update the size
    Decrease_Size(cNumToPop);
}

Emplace Awaiting

We may want to wait to pop if the queue is empty (e.g. a task thread waiting for work to do), and/or to wait to emplace if the queue is full (so that we don't lose any messages). The best way to do that is for these threads to wait for changes to the mSize variable. This way we don't need to include any additional class members.

We will however include some additional static member variables (as well as some utility functions) that will help us implement our waiting:

//UTILITIES
constexpr bool Await_Pushes(WaitPolicy aWaiting)
{
    return (aWaiting == WaitPolicy::PushAwait) || 
        (aWaiting == WaitPolicy::BothAwait);
}

constexpr bool Await_Pops(WaitPolicy aWaiting)
{
    return (aWaiting == WaitPolicy::PopAwait) || 
        (aWaiting == WaitPolicy::BothAwait);
}

template <typename DataType, WaitPolicy Waiting>
class Queue<DataType, ThreadsPolicy::SPSC, Waiting>
{
    //...
private:
    static constexpr bool sPushAwait = Await_Pushes(Waiting);
    static constexpr bool sPopAwait = Await_Pops(Waiting);
    static constexpr int sSizeMask = 0x80000000; //ASSUMES 32 BIT int!
}

We'll use the sPushAwait and sPopAwait variables primarily to control the interface of the queue (using concepts). The sSizeMask variable is needed to assist with ending waits during shutdown, as detailed further below.

The logic for waiting when emplacing is rather simple: each time we fail to emplace, we wait if and until mSize is not mCapacity (i.e. until the container is no longer full). If mSize is not mCapacity right away (e.g. we load an old value), then we'll just try again until we do. We use std::memory_order::acquire on mSize for the wait to ensure that when we do unblock, we will see the updated value of mPopIndex so that our emplace is more likely to succeed.

//Class template parameters removed for brevity
template <typename... ArgumentTypes>
void Queue::Emplace_Await(ArgumentTypes&&... aArguments) requires (sPushAwait)
{
    //Acquire: Need sync to see the latest queue indices
    while(!Emplace(std::forward<ArgumentTypes>(aArguments)...))
        mSize.wait(mCapacity, std::memory_order::acquire);
}

template <typename InputType>
void Queue::Emplace_Multiple_Await(Span<InputType> aSpan) requires (sPushAwait)
{
    while(true)
    {
        aSpan = Emplace_Multiple(aSpan);
        if(aSpan.empty())
            return;

        //Acquire: Need sync to see the latest queue indices
        mSize.wait(mCapacity, std::memory_order::acquire);
    }
}

Pop Awaiting

Waiting to pop would be similar, except when we're shutting down the engine we need some way to tell these threads to stop waiting. The popping threads will also wait on mSize, so during shutdown we need to change mSize to wake them up. However, we also want to retain our knowledge of how many items are in the queue, so we can't just change mSize arbitrarily.

So when we want to end pop-awaiting, we'll set the highest bit of mSize, which will allow the popping threads to unblock, while still being able to extract the correct value of the size. Below is the code for ending and resetting pop-waiting, as well as extracting the true size of the queue:

//Class template parameters removed for brevity
void Queue::End_PopWaiting() requires(sPopAwait)
{
    //Tell all popping threads we're shutting down
    //Release order: Syncs indices, and prevents code reordering after this. 
    auto cPriorSize = mSize.fetch_or(sSizeMask, std::memory_order::release);

    //Notify any waiting threads, but only if it was empty
    if(cPriorSize == 0)
        mSize.notify_all();
}

void Queue::Reset_PopWaiting() requires(sPopAwait)
{
    //Relaxed: Sync of other data is not needed: Queue state unchanged
    mSize.fetch_and(~sSizeMask, std::memory_order::relaxed);
}

int Queue:size() const
{
    //Relaxed: Nothing to synchronize when reading this
    auto cSize = mSize.load(std::memory_order::relaxed);
    return cSize & (~sSizeMask); //Clear the high bit!
}

In End_PopWaiting() , if the queue was empty we notify_all() threads waiting on mSize to wake them up. We use std::memory_order::release on the mSize update so that we'll have an acquire-release pair with the acquire ordering that we'll have in pop-await functions below. This pairing means that any changes made (in the End_PopWaiting() thread) prior to setting mSize will be synchronized-with the observation of this change of mSize in the pop-await functions. That way any waking threads that see that we've ended the waiting will also see any other changes that we've made.

The logic for pop-awaiting is identical to that for emplace-awaiting, except if after waiting mSize is sSizeMask, we know that not only do we not want to wait anymore, but also that the queue is currently empty and we should just return:

//Class template parameters removed for brevity
bool Queue::Pop_Await(DataType& aPopped) requires (sPopAwait)
{
    while(true)
    {
        if(Pop(aPopped))
            return true;

        //The queue was empty, wait until someone pushes or we're ending
        //Acquire: Need sync to see the latest queue indices
        mSize.wait(0, std::memory_order::acquire);

        //If mSize is sSizeMask then nothing will push, and none left to pop. 
        //Relaxed: Nothing to sync, and if we miss sSizeMask we'll see it soon
        if(mSize.load(std::memory_order::relaxed) == sSizeMask)
            return false;
    }
}

template <typename ContainerType>
void Queue::Pop_Multiple_Await(ContainerType& aPopped) requires (sPopAwait)
{
    while(true)
    {
        Pop_Multiple(aPopped);
        if(!aPopped.empty())
            return;

        //Comments are identical to Pop_Await()
        mSize.wait(0, std::memory_order::relaxed);

        //If mSize is sSizeMask then nothing will push, and none left to pop. 
        //Relaxed: Nothing to sync, and if we miss sSizeMask we'll see it soon 
        if(mSize.load(std::memory_order::relaxed) == sSizeMask)
            return;
    }
}

Finally, the methods for increasing and decreasing mSize:

//Class template parameters removed for brevity
void Queue::Increase_Size(int aNumPushed)
{
    //Release if pop-awaiting (Syncs indices), else relaxed (no sync needed)
    static constexpr auto sOrder = sPopAwait ? std::memory_order::release : 
        std::memory_order::relaxed;
    [[maybe_unused]] auto cPriorSize = mSize.fetch_add(aNumPushed, sOrder);

    if constexpr (sPopAwait)
    {
        //If was empty, notify all threads
        //No need to clear high bit: if set, pop-waits already ended
        if (cPriorSize == 0)
            mSize.notify_all();
    }
}

void Queue::Decrease_Size(int aNumPopped)
{
    //Release if push-awaiting (Syncs indices), else relaxed (no sync needed)
    static constexpr auto sOrder = sPushAwait ? std::memory_order::release : 
        std::memory_order::relaxed;
    [[maybe_unused]] auto cPriorSize = mSize.fetch_sub(aNumPopped, sOrder);

    if constexpr (sPushAwait)
    {
        //If was full (clear the high bit!), notify all threads
        if ((cSize & (~sSizeMask)) == mCapacity)
            mSize.notify_all();
    }
}

Note that if we don't allow waiting then the memory orders are relaxed, as there's no data we need to worry about synchronizing between threads. Otherwise, we use std::memory_order::release to create acquire-release pairs with the acquire's in the emplace-await and pop-await functions. That way when the threads unblock, they will see the updates to the queue and will succeed in their emplacing/popping.

Conclusion

Single-producer, single-consumer queues are very useful in multithreaded programs, and are commonly used for message passing between dedicated threads. For example, submitting file-save/load requests from the main game thread to an IO-thread, submitting network packet requests to a networking thread, audio requests to an audio thread, etc.

Depending on your engine architecture though, you might need queues that support multiple producer and consumer threads, such as for an async task queue. These types of queues will be covered in Part II of this blog series.

I should note that while I have written this code for an abstract machine, I have only tested this code on x86. I have not been able to test this yet on an ARM processor, which has a more relaxed memory model. Thus there may be subtle problems with the chosen atomic memory orders that I haven't found yet. If you're targeting a platform using ARM (e.g. Android, iOS), review the memory ordering carefully and please let me know if you encounter any problems!

License & Disclaimers

Support This Blog!

Did you find this article valuable?

Support C++ Professional Game Engine Programming by becoming a sponsor. Any amount is appreciated!