Thursday, May 23, 2013

Lock-free Multi-producer Multi-consumer Queue on Ring Buffer

My article "Lock-Free Multi-Producer Multi-Consumer Queue on Ring Buffer" was published by Linux Journal more than 30 days ago, so now I can post it here.

Work queue has always been one of the most hot points in server software.
  Here is how to scale it effectively to multi-core environment.


I. INTRODUCTION

Nowadays high performance server software (e.g. HTTP accelerator) in most cases
runs on multi-core machines. Modern hardware could provide 32, 64 and more CPU
cores. In such highly-concurrent environment lock contention sometimes hurts
overall system performance more than data copying, context switches etc. Thus,
moving most hot data structures from locked to lock-free design can
significantly improve performance of software working in multi-core
environment.

One of the most hot data structure in traditional server software is work
queue, which could have hundreds of thousands push and pop operations per
second from tens of producers and/or consumers.

Work queue is a FIFO data structure which has only two operations: push() and
pop(). It usually limits its size such that pop() waits if there
is no elements in the queue and push() waits if the queue contains maximum
allowed number of elements. It is important that many threads can execute pop()
and push() operations simultaneously on different CPU cores.

One of the possible work queue implementations is ring buffer storing pointers
to the queued elements. It has good performance especially in comparison
with common non-intrusive linked list (which stores copies of values passed by
the user, e.g. std::list).
Significant thing about ring buffer implementation is that it natively limits
its size - you only need to move current position in round-robin fashion. From
other side linked lists require maintaining additional field for total queue
length. With linked list push and pop operations have to modify the queue
length in addition to element links updating, so you need to take more care of
consistency in the queue for lock-free implementation.

Basically different CPU families provides different guarantees for memory
operations ordering and this is critical for lock-free algorithms.
In this article we'll concentrate on x86 as most widespread architecture
rather than write generic (but slower) code.


II. NAIVE SYNCHRONIZED QUEUE

First of all lets define the interface for our queue (I'll use C++11 in the
article):

        template<class T, long Q_SIZE>
        class NaiveQueue {
        public:
            NaiveQueue();
            void push(T *x);
            T *pop();
        };

The queue will store T* pointers and has maximum size of Q_SIZE.

Lets see how the queue would looks in naive locked implementation. To develop
the queue we need an array in which we place our ring buffer. We can define
this as

        T *ptr_array_[Q_SIZE];

Two members of the class, head_ and tail_, will point to head (next position
to push an element) and tail (next item to pop) of the queue and should be
initialized to zero in the class construction. We can simplify our operations
on ring buffer by defining the counters as unsigned long. Unsigned long (which
is 64 bit in length) is large enough to handle ever millions operations per
second for thousands of years. So tail_ and head_ will be defined as:

        unsigned long head_;
        unsigned long tail_;

This way we can access the elements (the same for head_ and tail_) just by

        ptr_array_[tail_++ & Q_MASK]

Where Q_MASK is defined as

        static const unsigned long Q_MASK = Q_SIZE - 1;

To get current position in the array we can calculate a remainder of integer
division of tail_ by Q_SIZE, but rather we define Q_SIZE as a power of 2
(32768 in our case) so we can use bitwise AND between Q_MASK and tail_ which
is bit faster.

Since the operations on the queue must wait if there is no elements or the
queue is full, we need two condition variables:

        std::condition_variable cond_empty_;
        std::condition_variable cond_overflow_;

to wait on some new elements in the queue or some free space respectively.
Surely, we need a mutex to serialize our queue:

        std::mutex mtx_;

This way we can write push() and pop() in the following way:

        void push(T *x)
        {
            std::unique_lock<std::mutex> lock(mtx_);

            cond_overflow_.wait(lock, [&head_, &tail_]() {
                            return tail_ + Q_SIZE > head_;
                    });

            ptr_array_[head_++ & Q_MASK] = x;

            cond_empty_.notify_one();
        }

        T *pop()
        {
            std::unique_lock<std::mutex> lock(mtx_);

            cond_empty_.wait(lock, [&head_, &tail_]() {
                            return tail_ < head_;
                    });

            T *x = ptr_array_[tail_++ & Q_MASK];

            cond_overflow_.notify_one();

            return x;
        }

We perform both the operations under acquired exclusive lock using mtx_. When
the lock acquired we can check current queue state: whether it is empty (and
we can not pop any new element) or full (can not push a new element).
std::condition_variable::wait() moves the current thread to sleep state until
the specified predicate is true. Next we push or pop an element and notify
other thread (by notify_one() call) that we have changed the queue state.
Since we add or delete only one element at a time, then only one thread waiting
for a available elements or free slots in the queue can make progress, so we
notify and wake up only one thread.

The problem with the implementation is that only one thread at single point of
time can modify the queue. Moreover mutexes and condition variables
are expensive - in Linux they are implemented by futex(2) system call.
So each time when a thread needs to wait on a mutex or condition variable,
that leads to call futex(2) which reschedule the thread and moves it to wait
queue.

Now lets run plain test which just push and pop addresses to and from the
queue in 16 producers and 16 consumers (please refer end of the article for
link to full source code). On a box with 16 Xeon cores the test took about 7
minutes:

        # time ./a.out

        real    6m59.219s
        user    6m21.515s
        sys     72m34.177s

And strace with -c and -f options shows that 99.98% of time the program spends
in futex system call.


III. LOCK-FREE MULTI-PRODUCER MULTI-CONSUMER QUEUE

Hopefully you do not have to ask kernel for help with user space threads
synchronization. CPU (at least the most known architectures) provide atomic
memory operations and barriers. With the operations you can atomically

 * read memory operand, modify it and write back
 * read memory operand, compare it with a value and swap with other value

Memory barriers are special assembly instructions also known as fences.
Fences guarantee instructions execution order on local CPU and visibility
order on other CPUs. Lets consider two independent by data instructions, A
and B, separated by fence (let it be mfence which provides guarantee for
ordering of read and write operations):

 A
 mfence
 B

The fence guaranties that:
1. compiler optimizations won't move A after the fence or B before the fence;
2. CPU will execute A and B instructions in-order (event it normally executes
   instructions out-of-order);
3. other CPU cores and processor packages, which work on the same bus, will
   see result of instruction A before result of instruction B.

For our queue we need to synchronize multiple threads access to head_ and
tail_ fields. Actually, when you run head_++ (this is an example of RMW,
Read-Modify-Write, operation since processor must read current head_ value,
increment it locally and write back to memory) on two cores, then both the cores
could simultaneously read current head_ value, increment it and simultaneously
write the new value back, so one increment is lost. For atomic operations
C++11 provides std::atomic template which should replace current GCC sync_
intrinsics in future. Unfortunately, for my compiler (GCC 4.6.3 for x86-64)
std::atomic<> methods still generate extra fences independently on specified
memory order. So I'll use old GCC's intrinsics for atomic operations.

We can atomically read and increment a variable (e.g. our head_) by

        __sync_fetch_and_add(&head_, 1);

This makes CPU to lock the shared memory location on which it's going to do an
operation (increment in our case). In multiprocessor environment processors
communicate to each other to ensure that they all see relevant data. This is
known as cache coherency protocol. By this protocol processor can take
exclusive ownership on a memory location. However these communications are not
for free and we should use such atomic operations carefully and only when
we really need them. Otherwise we can hurt performance significantly.

Meanwhile plain read and write operations on memory locations execute
atomically and do not require any additional actions (like specifying 'lock'
prefix to make the instruction run atomically on x86 architecture).

In our lock-free implementation we're going to abandon mutex mtx_ and
consequently both the condition variable. However we still need to wait if the
queue is full on push and if the queue is empty on pop operations. For push we
would do this by simple loop like we did it for locked queue:

        while (tail_ + Q_SIZE < head_)
            sched_yield();

sched_yield() just lets other thread to run on current processor. This is
native and fastest way to reschedule current thread. However if there is no
other thread which is waiting in scheduler run queue for available CPU, then
current thread will be immediately scheduled back. Thus we'll see always 100%
CPU usage, ever if we have no data to process. To cope with this we can use
usleep(3) with some small value.

Lets see more carefully what's going on in the loop. Firstly we read tail_ value,
next we read value of head_ and after that we make a decision whether to wait
or push an element and move head_ forward. Current thread can schedule at any
place of the check and ever after the check. Lets consider 2 threads scenario:

        Thread 1                  Thread 2

        read tail_                read tail_
        read head_                read head_
        (scheduled)               push an element
        push an element

If we had only one free place in the ring buffer, then we override pointer to
oldest queued element. We can solve the problem by incrementing the shared
head_ before the loop and use temporal local variable (i.e. we reserve a place
to which we're going to insert an element and wait when it is free):

        unsigned long tmp_head =
            __sync_fetch_and_add(&head_, 1);
        while (tail_ + Q_SIZE < tmp_head)
            sched_yield();
        ptr_array_[tmp_head & Q_MASK] = x;

We can write similar code for pop() (just swap head and tail). However the
problem still exists. Two producers can increment head_, check that they have
enough space and reschedule at the same time just before inserting x. A
consumer can wake up instantly (it sees that head_ moved forward to two
positions) and read a value from the queue which was not inserted yet.

Before solving the issue lets see which picture we have in 2 producers (P1 and
P2) and 2 consumers (C1 and C2) case:

                     LT                          LH
        | _ | _ | _ | x | x | x | x | x | x | x | _ | _ | _ |
                      ^   ^                       ^   ^
                      |   |                       |   |
                      C1  C2                      P1  P2

On the picture '_' denotes free slots and 'x' denotes inserted elements. At
the picture C1 and C2 are going to read values and P1 and P2 are going to
write an elements to currently free slots. Let LT be a latest (lowest) tail
value among all the consumers, which is stored in tmp_tail of latest consumer,
C1 on the picture. Consumer C1 currently can work on the queue at LT position
(i.e. it is at the middle of fetching the element). And let LH correspondingly
be lowest value of tmp_head among all the producers. At each given time you can
not push an element to position equal or greater than LT and should not try to
pop an element at position equal or greater than LH. It means that all the
producers should care about current LT value and all consumers about current
LH value. So lets introduce the two helping class members for LH and LT:

        volatile unsigned long last_head_;
        volatile unsigned long last_tail_;

Thus we should check for last_tail_ value instead of tail_ in the loop above.
We need to update the values from multiple threads, but we're going to do this
by plain write operations, without RMW. So the members do not have to be of
atomic type. I just specified the variables as volatile to prevent their values
caching in local processor registers.

Now the question is who and when should update last_head_ and last_tail_
values. We do expect that in most cases we are able to perform push and/or pop
operation on the queue without a wait. Thus we can update the two helping
variables only when we really need them, i.e. inside the waiting loop.
So when a producer realizes that it can not insert a new element because of too
small last_tail_ value it falls into the wait loop and try to update last_tail_
value. To update the value the thread must inspect current tmp_tail of each
consumer. So we need to make the temporal value visible to other threads.
One of the possible solutions is to maintain an array of tmp_tail and tmp_head
values with size equal to number of running threads. We can do this with
following code:

        struct ThrPos {
            volatile unsigned long head, tail;
        };

        ThrPos thr_p_[std::max(n_consumers_, n_producers_)];

where n_consumers_ is the number of consumers and n_producers_ is the number of
producers. We can allocate the array dynamically, but leave it statically sized
for simplicity for now. Many threads read the elements of the array, but only one
thread with plain move instruction (no RMW operation) can update them,
so you also can use regular reads on the variables.

Since thr_p_ values are used to only limit moving of current queue pointers,
then we initialize them to maximum allowed values, i.e. do not limit head_ and
tail_ movings until somebody push or pop into the queue.

We can find the lowest tail values for all the consumers by following loop:

        auto min = tail_;
        for (size_t i = 0; i < n_consumers_; ++i) {
            auto tmp_t = thr_p_[i].tail;

            asm volatile("" ::: "memory"); // compiler barrier

            if (tmp_t < min)
                min = tmp_t;
        l}

The temporal variable tmp_t is required here since you can not atomically
compare whether thr_p_[i].tail is less than min and update min if it is. When
you remember current consumer's tail and compare it with min, the consumer can
move the tail. It can move it only forward, so the check in the while
condition is still correct and you won't overwrite some live queue elements.
But if you wouldn't use tmp_t and write the code like

        if (thr_p_[i].tail < min)
            min = thr_p_[i].tail;

Then the consumer can has lower tail value while you're comparing it with min,
but move it far forward after the comparison is done and just before the
assignment. So you probably find incorrect minimal value.

I added compiler barrier, asm volatile("" ::: "memory") (this is GCC specific
compiler barrier), to be sure that compiler won't move thr_p_[i].tail access
and will access the memory location only once - to load its value to tmp_t.

One important thing about the array is that it must be indexed by current thread
identifier. Since POSIX threads (and consequently C++ threads which uses
them) do not use small monotonically increasing values for threads identifying,
then we need to use our own thread wrapping. I'll use inline thr_pos() method of
the queue to access the array elements:

        ThrPos& thr_pos() const
        {
            return thr_p_[ThrId()];
        }

(you can find example of ThrId() implementation in the source referenced at
the end of the article).

Before writing the final implementation of push() and pop() lets back to
initial application of our queue, work queue. Usually, producers and consumers
do a lot of work between operations with the queue. For instance, it could be
very slow IO operation. So what happens if one consumer fetch an element from
the queue and go to sleep in long IO operation? Its tail value will be stay the
same for long time and all the producers will wait on it ever all the other
consumers fully cleared the queue. This is not desired behavior.

Lets fix this by two steps. First, lets assign to per-thread tail pointer
maximum allowed value just after the fetching the element. So we should write
following at the end of pop() method:

        T *ret = ptr_array_[thr_pos().tail & Q_MASK];
        thr_pos().tail = ULONG_MAX;
        return ret;

Since a producer in push() starts to find minimal allowed value for last_tail_
from current value of global tail_, then it can assign current tail_ value
to last_tail_ only if there is no any active consumers. This is what we wish.

Generally speaking, other processors can see thr_pos().tail update before
local processor reads from ptr_array_, so they can move and overwrite the
position in the array before local processor reads it. This is possible on
processors with relaxed memory operation ordering. However x86 provides
relatively strict memory ordering rules, particularly it guarantees that
1. stores are not reordered with earlier loads
2. and stores are seen in consistent order by other processors.
Thus, loading from ptr_array_ and storing to thr_pos().tail in the code above
will be done on x86 and seen by all processors in exactly this order.
So we don't need any explicit memory barriers here.

The second step which we need to do is correctly set thr_pos().tail at the
beginning of pop(). We assign current thr_pos().tail by

        thr_pos().tail = __sync_fetch_and_add(&tail_, 1);

The problem is that the operation is atomic only for tail_ shift, but not for
thr_pos().tail assignment. So there is a time window in which
thr_pos().tail = ULONG_MAX, and tail_ could be shifted significantly by other
consumers, so push() will set last_tail_ to current, just incremented, tail_.
So when we're are going to pop an element we have to reserve a tail position
less or equal to tail_ value which we'll pop an element from:

        thr_pos().tail = tail_;
        thr_pos().tail = __sync_fetch_and_add(&tail_, 1);

In this code we actually perform following three operations:

        write tail_ to thr_pos().tail
        increment tail_
        write previous value of tail_ to thr_pos().tail

Again, in general case we have no guarantee that other processors will "see"
results of the write operations in the same order. Potentially some other
processor can firstly read incremented tail_ value, try to find new last_tail_
and only after that read new current thread tail value. However,
__sync_fetch_and_add() executes locked instruction which implies implicit full
memory barrier on most architectures (including x86), so neither first nor third
operations can not be moved over the second one. Therefore we also can skip
explicit memory barriers here.

Thus if the queue is almost full then all producers will stop at or before
the position of element which we're popping.

Now we're are ready to write our final implementation of push() and pop()
methods. Here they are:

        void push(T *ptr)
        {
            thr_pos().head = head_;
            thr_pos().head = __sync_fetch_and_add(&head_, 1);

            while (__builtin_expect(thr_pos().head >=
                                    last_tail_ + Q_SIZE, 0))
            {
                ::sched_yield();

                auto min = tail_;
                for (size_t i = 0; i < n_consumers_; ++i) {
                    auto tmp_t = thr_p_[i].tail;

                    asm volatile("" ::: "memory"); // compiler barrier

                    if (tmp_t < min)
                        min = tmp_t;
                }
                last_tail_ = min;
            }

            ptr_array_[thr_pos().head & Q_MASK] = ptr;
            thr_pos().head = ULONG_MAX;
        }

        T *pop()
        {
            thr_pos().tail = tail_;
            thr_pos().tail = __sync_fetch_and_add(&tail_, 1);

            while (__builtin_expect(thr_pos().tail >=
                                    last_head_, 0))
            {
                ::sched_yield();

                auto min = head_;
                for (size_t i = 0; i < n_producers_; ++i) {
                    auto tmp_h = thr_p_[i].head;

                    asm volatile("" ::: "memory"); // compiler barrier

                    if (tmp_h < min)
                        min = tmp_h;
                }
                last_head_ = min;
            }

            T *ret = ptr_array_[thr_pos().tail & Q_MASK];
            thr_pos().tail = ULONG_MAX;
            return ret;
        }

Careful reader can notice that multiple threads can scan current head or tail
values over all the producing or consuming threads. So number of threads can
find different min values and try to write them to last_head_ or last_tail_
simultaneously, so probably you would use CAS operation here. However atomic
CAS is expensive and worst that can happen there is that you assign too small
value to last_head_ or last_tail_. Or ever overwrite new higher value with a
smaller old value, so you'll fall into sched_yield() again. Maybe we fall to
sched_yield() more frequently than if we use synchronized CAS operation,
but in practice the cost of extra atomic operation reduces performance.

Also I used __builtin_expect with zero expect argument to say that we do not
expect that the condition in while statement becomes true too frequently and
compiler should move the inner loop code after the code executed if the
condition is false. This way you can improve instruction cache usage.

Finally lets run the same test as for naive queue:

        # time ./a.out 

        real    1m53.566s
        user    27m55.784s
        sys     2m4.461s

This is 3.7 times faster than our naive queue implementation!


IV. CONCLUSION

Nowadays, high performance computing is typically achieved by two ways:
horizontal scaling (scale-out) by adding new computational nodes and
vertical scaling (scale-up) by adding extra computational resources (like
CPUs or memory) to a single node. Unfortunatelly, linear scaling is possible
only in theory. In practice if you double your computational resources, then
it is likely that you get only 30-60% performance gain. Lock contention is one
of the problems which prevents efficient scale-up by adding extra CPUs.
Lock-free algorigthms makes scale-up more productive and allows to get more
performance in multi-core environments.

The code for naive and lock-free queue implementations with the tests for
correctness is available at:

  https://github.com/krizhanovsky/NatSys-Lab/blob/master/lockfree_rb_q.cc

Alexander Krizhanovsky is the software architect and founder of NatSys-Lab.
Before NatSys-Lab he was working as Senior Software Developer at IBM, Yandex
and Parallels. He specializes in high performance solutions for UNIX
environment.

Special thanks to Johann George from SanDisk for final review of the paper.

3 comments:

  1. Hi, thank you for nice article.
    This algorithm is not lock-free but obstruction-free.

    In case, all producer is preempted by scheduler just after
    ptr_array_[thr_pos().head & Q_MASK] = ptr;
    line, all consumer will get infinity loop.
    sched_yield() can give progress to consumer from consumer.
    By continuing such situation, consumers will get live-lock.

    Practically, such situation may not happen, so this algorithm is safe.

    ReplyDelete
  2. Hi,

    this is good point about the live lock. This terrible situation becomes very probable if we have number of producers and consumers threads greater than number of CPU cores.

    However, it should not be live lock actually, since the system scheduler will preempt the consumers the same way, so producers get time to finish push() and move the pointers.

    Also this is always good to run one thread per one core to minimize context switches and get better performance of the algorithm. And in this case we won't get the nasty preemption issue.

    ReplyDelete
  3. Hello, thanx for the wonderful article. I have some very fundamental questions regarding the unit test. What exactly is being pushed and pop? Secondly, what is the purpose of X_EMPTY and X_MISSED?

    Thanx in advance!

    ReplyDelete