High Performance Linux

Friday, November 29, 2013

Calling Closed Kernel Functions in Linux Kernel Modules

Linux kernel exports by EXPORT_SYMBOL and Co. some its functions. Such functions can be used in loadable kernel modules. However, other functions, e.g. ip_rcv() or tcp_v4_rcv(), are closed. If you need some of these functions, then you can write trivial kernel patch which just exports the functions. We do this in our Synchronous Sockets.

However, there is more simple method. Linux kernel has nice kallsyms interface, which provides you addresses of kernel symbols. So firstly, you can just grep required symbol:

    $ grep '\<ip_rcv\>' /proc/kallsyms
    ffffffff8143590a T ip_rcv

And call this from a shell script and pass it somehow to your module which needs to call the function.

Hopefully, Linux kernel exports interface to kallsyms, so GPL-licensed modules can use it to find desired symbols.

Recently, we've written simple Linux kernel module which makes Nginx HTTP server working in Deep Packet Inspection (DPI) mode - you can attach a machine with Nginx to SPAN port of you router and Nginx thinks that it gets traffic from real clients and operate with them in common way. To do this we had to generate custon TCP ACK, FIN and RST segments and pass them directly to Linux TCP code. We did this with tcp_do_rcv() call. So lets see how to call the closed function from loadable kernel module:

    static int (*tcp_v4_rcv_ptr)(struct sk_buff *);

    static void *
    get_tcp_v4_rcv_ptr(void)
    {
        unsigned long tcp_v4_rcv_addr = 0;

        int get_tcp_v4_rcv(void *data, const char *namebuf,
                           struct module *owner, unsigned long addr)
        {
            if (strcmp(namebuf, "tcp_v4_rcv"))
                return 0;
            *(unsigned long *)data = addr;
            return 1;
        }

        kallsyms_on_each_symbol(get_tcp_v4_rcv, &tcp_v4_rcv_addr);

        return (void *)tcp_v4_rcv_addr;
    }


    tcp_v4_rcv_ptr = wd_get_tcp_v4_rcv_ptr();

    /* Call tcp_v4_rcv() and pass the packet directly to TCP code. */
    tcp_v4_rcv_ptr(aw->skb);

Monday, November 11, 2013

Studying Intel TSX Performance

Lock-free algorithms on atomic operations perfectly work with updating of small data (typically 8 or 16 bytes on modern x86-64 hardware). If you need to update more data, then you have to spin in checking loop to verify whether a particular update is consistent with other concurrent updates.

Suppose you have N source bank accounts and N destination bank accounts. And you need to transfer money from the source accounts to the destination at once. This is classic example for database transaction (usually database books use N = 1). For simplicity we can describe each account by one integer number, so if N = 1, then we can handle the transaction using double CAS (Compare And Swap, cmpxchg16b instruction on x86-64) operation. However, if N is much larger, then it's time to think about Transactional Memory. One year ago I've written about software transactional memory in GCC, but it's quite slow. So now it's time to see at Intel TSX.

The Test Case

Our target is to atomically execute following function:

    void
    trx_func(unsigned long trx_sz)

    {
        for (unsigned i = 0; i < trx_sz; ++i) {
            debit[i] += 1;
            credit[i] += -1;
        }
    }


(we move only one dollar in our example). Intel TSX operates by CPU cache lines (64 bytes for Haswell), so we need to ensure that each transaction reads and modifies only its own cache lines and doesn't affect cache lines of other transactions. So debit and credit could be defined as:

    struct CacheLine {
        long c[L1DSZ / sizeof(long)];
        CacheLine() : c{0} {}

        void
        operator+=(int x)
        {
            c[0] += x;
        }
    } __attribute__((aligned(L1DSZ)));

    CacheLine debit[TRX_BUF_SZ_MAX]

        __attribute__((aligned(L1DSZ)));
    CacheLine credit[TRX_BUF_SZ_MAX]

        __attribute__((aligned(L1DSZ)));

L1DSZ is size of cache line (getconf LEVEL1_DCACHE_LINESIZE). TRX_BUF_SZ_MAX is just some relatively big value, in my case it's 8192, we won't refer to it any more.

To understand TSX performance we need some code which can be compared with TSX transactions. So let's write simple spin lock synchronization:

    void
    execute_spinlock_trx(unsigned long trx_sz)
    {
        pthread_spin_lock(&spin_l);

        trx_func(trx_sz);

        pthread_spin_unlock(&spin_l);
    }


Certainly, the code must be run on many threads on multi core system. I won't show the threading code, you can find it at GitHub (compilation notes are in the header comment of the source code file).

Now let's have a look how we can use Intel TSX to execute trx_func() atomically:

    void
    execute_short_trx(unsigned long trx_sz)
    {
        while (1) {
            unsigned status = _xbegin();

            if (status == _XBEGIN_STARTED) {
                // we're in transactional context

                // Hacky check whether spinlock is locked.
                // See glibc/nptl/sysdeps/x86_64/pthread_spin_unlock.S

                if ((int)spin_l != 1)
                    _xabort(_ABORT_LOCK_BUSY);

                trx_func(trx_sz);

                _xend();

                return;
            }
            

            if (!(status & _XABORT_RETRY)
                && !(status & _XABORT_CONFLICT)
                && !((status & _XABORT_EXPLICIT)
                     && _XABORT_CODE(status) != _ABORT_LOCK_BUSY))
                break;

            _mm_pause();
        }

        // fallback to spinlock.
        execute_spinlock_trx(trx_sz);
    }


_xbegin(), _xend() and _xabort() functions as well as  _ABORT_LOCK_BUSY and _XABORT_* defines are stolen from glibc-2.18 code (nptl/sysdeps/unix/sysv/linux/x86/elision-lock.c, see also Lock Elision in the GNU C Library).

The function was also mostly written using __lll_lock_elision() from glibc-2.18 as an example. The function does following. Firstly, it starts TSX RTM (Restricted Transactional Memory) transaction using _xbegin(). If the transaction is normally started, then status has value _XBEGIN_STARTED and we're going into appropriate if branch. Code in the branch ends with return statement, so we exit function if the transaction is normally commited (using _xend() call). If the transaction aborts due to any reason, then all the changes in the branch are rolled back. Moreover, on rollback status takes different value and we jump to just after _xbegin() and test status again. Thus, the code after if corresponds to aborted transaction.

The function has a fallback path to spin lock. This is a common practise for TSX programming. Andi Kleen wrote nice article about this. Firstly, we check that spin lock is unlocked. This is done in transactional context, so TSX adds lock_l to its read set, so if some other CPU tries to acquire the lock, then it updates lock_l and current transaction aborts. If the lock is acquired, then somebody modifies protected data using the spin lock, so we need to abort the transaction. Next, there is two possibilities: try to execute the transaction again or also, like other CPU, fallback to spin lock.

Just falling back to spin lock it it's already acquired by other CPU gave very poor performance. Imagine that there is 2 CPUs. The first one tries to run transaction, but it aborts due to some reason (aborts are very common for TSX as we'll see bit later) and falls back to spin lock, acquires it and starts to update data. The second CPU also tries to execute transaction and sees that the lock is held by the first CPU, so it also fails back to spin lock. Spin lock is busy, so the second CPU goes to busy loop on it. When the first CPU finishes with its updates, then it releases the lock and the lock immediately acquired by waiting CPU. Now first CPUs tries to run transaction again and finds that the lock is acquired by other CPU, so it also fails back to spin lock... This scenario shows that naive fallback can lead to situation when only spin lock is usedto synchronize data and transactional memory doesn't work at all.

Glibc's __lll_lock_elision() uses adaptive locking algorithm which tries to balance between transaction restartings and fallbacks. We're interested in TSX properties, so our algorithms tries hardly to execute transaction.

On transaction abort processor sets flags which indicate the reason for abort. If _XABORT_RETRY is set, then processor suggests that there is sense to restart transaction. If we abort the transaction explicitly, then _XABORT_EXPLICIT is set. And _XABORT_CONFLICT indicates that there is data conflict with other transaction. In these three cases we restart current transaction. However, transaction can be aborted due to limited system resources (_XABORT_CAPACITY) or other, not for busy lock, explicit abort. So we check the abort code and fallback to spin lock in all other cases.

Test Results

For performance measurements I used Intel Core i7-4650U (dual core 1.70GHz with hyperthreading). The processor has 32KB 8-way Data L1 cache. The system was running Linux 3.12.0-rc6 with patches by Andi Kleen (git://git.kernel.org/pub/scm/linux/kernel/git/ak/linux-misc.git hsw/pmuX). X server and neworking were down during the tests and no any activity was performed on the machine.

It seems (see the abort tests below and Intel documentation: "Intel 64 and IA-32 Architectures Software Developer’s Manual Volume 1: Basic Architecture" and "Intel 64 and IA-32 Architectures Optimization Reference Manual") that TSX transactions abort if data doesn't fit L1 data cache, so all the tests uses very small data set which fits into L1 cache. Sine there is no memory operations or other CPU waiting points, then this is the case to switch HyperThreading off for better performance. My computer doesn't have such BIOS option, so I just use 2 threads binded to physical cores (CPUs 0 and 1):

    $ grep 'processor\|core id' /proc/cpuinfo
    processor       : 0
    core id         : 0
    processor       : 1
    core id         : 1
    processor       : 2
    core id         : 0
    processor       : 3
    core id         : 1


All the tests below were ran for 10M iterations (i.e. iter variable is equal to 10000000).

Aborts on Single-threaded workload

Single-threaded workload shows how TSX transactions work without contention on shared data between CPUs. This testing workload is produced by following lines in main():

    for (int trx_sz = 32; trx_sz <= 1024; trx_sz += 4)
        run_test(1, trx_sz, 1, 0, iter, Sync::TSX);

Figure 1: Dependency of aborts on transaction size (1 CPU)
Dependency of aborts number on transaction size (in cache lines) is depicted on Figure 1 (both the axes are logarithm scaled). Number of aborts (precisely, transaction aborts with clean _XABORT_RETRY bit in status) reaches 100% (10M) at around 256 cache lines. I count aborts number by local integer counter inside transaction abort handling code (please, see execute_short_trx() the source code for details). TSX provides abort code for aborted transaction, so we easily can gather statistics which type of aborts dominate in this workload. Just compile the program with -DABORT_COUNT and run the test case for trx_sz = 256:

    explicit abrt: 0
    retry abrt: 18
    conflict abrt: 18
    capacity abrt: 9969559


Let's check the results with Intel PCM tool (output is reduced for brevity):

    # ./pcm-tsx.x a.out -e RTM_RETIRED.ABORTED_MISC1 -e RTM_RETIRED.ABORTED_MISC2 -e RTM_RETIRED.ABORTED_MISC3 -e RTM_RETIRED.ABORTED_MISC4

    Time elapsed: 10453 ms 

    Event0: RTM_RETIRED.ABORTED_MISC1 Number of times an RTM execution aborted due to various memory events (raw 0x8c9)
    Event1: RTM_RETIRED.ABORTED_MISC2 Number of times an RTM execution aborted due to uncommon conditions (raw 0x10c9) 
    Event2: RTM_RETIRED.ABORTED_MISC3 Number of times an RTM execution aborted due to HLE-unfriendly instructions (raw 0x20c9) 
    Event3: RTM_RETIRED.ABORTED_MISC4 Number of times an RTM execution aborted due to incompatible memory type (raw 0x40c9)

    Core | Event0  | Event1  | Event2  | Event3
       0   9966 K       0         0         0    
 
      1      0         0         0         0    
  
     2      0         0         0         0    
  
    3      0         0         0         0
--------------------------------------------------
  
    *   9966 K       0         0         0      


Figure 2: Dependency of retries on transaction size
 So most of the aborts are caused by capacity problem. 256 * 64 = 16384 and this is a half of L1 data cache. The cache has 8-way associativity, however, it's still unlikely that the transaction work set produces so many address collisions that we can't utilize the cache fully. It is also unlikely that other program data utilizes rest 1 / 2 of the cache. So it seems that transaction size has lower limit even than L1 data cache.

Let's also plot graphs for number of retries and whole test execution time depending on transaction buffer size. Results are shown of Figure 2 and Figure 3 correspondingly.

Figure 3: Dependency of execution time on transaction size
 The time plot also shows significant fluctuation around transaction size 256 cache lines. At transaction size 244 it jumps from 10180ms to 12292ms after which execution time smoothly decreases to 9094ms for transaction size 264 and grows again.

UPD 1: as slotty noticed in the comment below each transaction in trx_func() modifies actually 2 cache lines, for debit and credit updates. The figure was drawn for transactions rather than acual number of modified cache lines by each transaction. So TSX transactions actually are limited by full L1d cache size.

TSX vs Spin Lock: Transaction Time

To run this test case we need to modify our trx_func() in following way:

    void
    trx_func(int thr_id, unsigned long trx_sz, int trx_count)
    {
        for (int c = 0; c < trx_count; c++)

            for (unsigned i = 0; i < trx_sz; ++i) {
                unsigned shift = thr_id * trx_sz + i;
                debit[shift] += 1;
                credit[shift] += -1;

            }
    }

Thus, we just execute the same data updates multiple time, so transaction work set stays the same while transaction time increases. Plus to adding surrounding loop I also added thread ID (0 or 1) to calculation of offset of updated item. This change allows both the CPUs perform always on different cache lines, so there is no data contention. And following lines of source code are responsible for the test:

    for (int trx_count = 1; trx_count <= 201; trx_count += 10)
        run_test(2, 2, trx_count, 0, iter, Sync::TSX);
    for (int trx_count = 1; trx_count <= 201; trx_count += 10)

        run_test(2, 2, trx_count, 0, iter, Sync::SpinLock);

Results for the tests are depicted on Figure 4. So for the short transactions (trx_count < 50) TSX shows better execution time, but on trx_count = 51 spin lock overtakes it.

Figure 4: TSX vs Spin Lock: Transaction Time
This results shows that TSX performs 3 times better (401ms vs 1329ms for trx_count = 1) on small transaction. This is interesting, but how to use this results in practise? I.e. when we should use TSX and when spin lock? In this thread Andi Kleen suggests "For contended conventional locks we usually recommend at least 200ns". This is also "just a number" and real benchmarks for particular workload, which is observed for TSX applicability, must be done.

However, in our case we don't have have data contention, i.e. both the CPUs can perform in parallel. Obviously, spin lock which must be acquired to change any data makes the code singe threaded (only one CPU can update the data at any given time). I expected that TSX should show much better results for the test due to more parallelism, but it isn't so...

To understand the issue let's compare aborts statistics for trx_count = 1 and trx_count = 60. For trx_count = 1 our statistics shows:

    explicit abrt: 28
    retry abrt: 567
    conflict abrt: 589

    capacity abrt: 8

for CPU 0 and

    explicit abrt: 67
    retry abrt: 441
    conflict abrt: 506
    capacity abrt: 3


for CPU 1. Meantime, pcm-tsx reports:

    Core | Event0  | Event1  | Event2  | Event3
       0    596         0        28         0     
       1    508         0        67         0     


Thus we can see that Event 2 with cryptic description "Number of times an RTM execution aborted due to HLE-unfriendly instructions" exactly matches our explicit aborts. Intel TSX has set of instructions which leads to transaction aborts. It seems that the aborts are handled as explicit (this is why we need to check abort code in execute_short_trx()). However, it's unclear why we didn't see the aborts in single threaded workload and Intel documentation with list of the instructions doesn't answer the question. Values for Event 0, "Number of times an RTM execution aborted due to various memory events", are very close to conflict aborts... The corresponding values for trx_count = 60 are:

     explicit abrt: 8524329
     retry abrt: 8538461
     conflict abrt: 8538484
     capacity abrt: 61


for CPU 0 and

    explicit abrt: 8524788
    retry abrt: 8554159
    conflict abrt: 8554179
    capacity abrt: 187


for CPU 1. pcm-tsx says:

    Core | Event0  | Event1  | Event2  | Event3
       0   8538 K       0      8524 K       0     
       1   8554 K       0      8524 K       0     


So the reason for low performance on many iterations inside the transaction is too huge aborts rate. Why do we see so many conflict aborts on uncontended data updates? Actually we have contended data - our spin lock for fallback. If we comment the fallback code (spin lock checking in transaction and acquiring the lock at the end of the function), then we'll see much better picture for trx_count = 60:

    explicit abrt: 0
    retry abrt: 425
    conflict abrt: 425
    capacity abrt: 204


    explicit abrt: 0
    retry abrt: 1886
    conflict abrt: 1886
    capacity abrt: 139


    Core | Event0  | Event1  | Event2  | Event3
       0    629         0         0         0     
       1   2025         0         0         0     


So it seems that spin lock fallback produces two types of aborts at the same time. If we comment out only _xabort(_ABORT_LOCK_BUSY), then we'll see very similar picture - zero Event 2. So Event 2 is exactly our explicit aborts. Intel documentation notes that transactions can abort due to various reasons - it looks like we have these various reasons as Event 0 and conflict & retry aborts.

TSX vs Spin Lock: Transaction Size

Now do the same as for previous test, but vary transaction work set instead of running time. The source code lines for the test are:

    for (int trx_sz = 1; trx_sz <= 256; trx_sz <<= 1)
        run_test(2, trx_sz, 1, 0, iter, Sync::TSX);
    for (int trx_sz = 1; trx_sz <= 256; trx_sz <<= 1)
        run_test(2, trx_sz, 1, 0, iter, Sync::SpinLock);


The test results are depicted on Figure 5 (note that both the axes are logarithm scaled). As for previous test we see very similar picture - TSX outperforms spin locks only for small data sets and loses at already at 32 cache lines.
Figure 5: TSX vs Spin Lock: Transaction Size

64 cache lines is a point at which TSX gets too much aborts (6,4M in camparison with only 7K for 32 cache lines). In discussion on Intel forum Andi Kleen suggested to things to optimize TSX performance:
  • "wait for the lock to become free again before retrying";
  • and "additional randomized backoff can also improve it in some cases".
So I made couple adjustments in execute_short_trx() function. Firstly, if the lock is acquired the function initially retries a transaction if it fins the lock acquired, but does it in transactional context. So I added following loop in abort handling part of the function:

    if ((status & _XABORT_EXPLICIT)
         && _XABORT_CODE(status) != _ABORT_LOCK_BUSY)
    {
        while ((int)spin_l != 1)
            _mm_pause();
        continue;
    }


Figure 6: TSX aborts on dual core workload
(the full adjusted code is available on GitHub). So we're spinning in the busy loop in waiting for the spin lock releasing before we restart the transaction. Results are shown of Figure 5 by blue curve - it shows much better time for the point of 64 cache lines (2314ms vs. 3412ms). Some of the other points somewhat better and some of them are somewhat worse.

To implement random fallbacks I used local abort counter abrt for the function (how many aborts happen during this run) and small array abrt_fallback of 64 constant items for the counter values. In the test each thread does 20M iterations and I've seen maximum aborts values also very close to 20M, so transactions have 1 abort in average. Thus I used very small values in abrt_fallback array from 0 to 0x39. To get randomness I intermixed the values. Following code does the "random" fallbacks:

    if (++abrt == abrt_fallback[af]) {
        af = (af + 1) % (sizeof(abrt_fallback)
                         / sizeof(*abrt_fallback));
        break;
    }


where af is global thread local index in the array.

Figure 6 shows how TSX aborts (for basic and the optimized versions) number raises in dual CPU environment (the figure is logarithm scaled on both the axes). Random fallbacks provides the lower abort rate in most cases, however as Figure 5 show it doesn't have the best execution time. So sometimes it's better to have higher abort rates by cost to avoid spin lock fallbacks (note that acquiring spin lock means that transaction on other CPU aborts and likely to try acquire the lock).

TSX vs Spin Lock: Data Overlapping

If we would have the workload as we was observing so far, then we simply could use fine grained spin locks to protect the data for each thread. Moreover, we even could update the data concurrently on different CPUs without any locks at all because using thread identifier thr_id we update different memory locations on different CPUs.

So now it's time to see more realistic example with arbitrary data overlapping. This is where transactional memory can't be easily replaced by fine grained locks.

Again, we need to modify our trx_func() that now it accepts additional parameter overlap and computes shift in following way:

    shift = thr_id * trx_sz + i - overlap * thr_id;

So now we can specify by overlap parameter how many data cells will be overlapped between CPUs. And the testing code is

    for (int overlap = 0; overlap <= 32; overlap++)
        run_test(2, 32, 1, overlap, iter, Sync::TSX);
    for (int overlap = 0; overlap <= 32; overlap++)
        run_test(2, 32, 1, overlap, iter, Sync::SpinLock);


Figure 7: TSX vs Spin Lock:data overlapping
The test was performed for transaction size of 32 cache lines with overlaping from 0 to all 32 cache lines.

Results are depicted on Figure 7. Average value for execution time for TSX is 2811ms and for spin lock is 2631ms.

It's expectable for spin lock that running time won't vary significantly with changing data overlapping - we have only one lock, so there is no difference to modify the same data cells on both the CPUs or completely different sets of cells. However I expected that transactional memory is sensitive to data overlapping, but it isn't so. We've already seen above that even nonoverlapping transactions still produces a lot of conflict aborts. And the same for this test - number of aborts for zero and all overlapping cells are the same, 14%.

UPD 2: Since we use spin lock as a fallback for TSX, then the spinlock can be that conflicting cache line which doesn't allow TSX scale on non-overlapping tests (i.e. the spinlock is the conflicting cache line). So I've ran the same test for TSX overlapping transactions with commented out spin lock fallback code. Unfortunately, it didn't change the curve for TSX on Figure 7.

Monday, August 12, 2013

Lock-free Condition Wait for Lock-free Multi-producer Multi-consumer Queue

The lock-free multi-producer multi-consumer queue on ring buffer described in my previous post has following properties:
  1. in pop() the queue calls sched_yeild() which leads to 100% CPU consumption;
  2. consumers are waiting for particular position in the queue, i.e. if you put an item to the queue with all sleeping consumers then one and only one consumer can eat it;
  3. the queue has fixed number of consumers and producers;
  4. say we have 4 consumers and there are no available elements in the queue, then all consumers will wait on 4 consequent positions (x, x + 1, x + 2, x +3);
The first one is undesired due to too high power consumption and misleading  system statistics. To cope with this we can use usleep(3), but it looks like dirty and relatively expensive solution. It would be better to make the pop()'er to wait until a push()'er adds an item to the queue. In the naive queue implementation I used conditional variable to do this. Also, the spinning behaviour of the queue leads to unnecessary cache bouncing and degrades performance.

In this post I'm going to show an efficient way for condition wait. The original article about lock-free queue has used C++11 for the code implementation, however in this article I'll be mostly talking about Linux kernel because the algorithm was developed for kernel implementation of the queue. I'll be explaining all kernel specific things, so no special skills are required from a reader.

If you need to make consuming thread to go to sleep when there are no items in the queue, then probably you write code like following (this is C-like pseudo-code):

    // Consumer
    while (thr_pos().tail >= last_head_) {
        wait_event_interruptible(wq,
                                 thr_pos().tail < last_head_);

        // Update the last_head_.
        // .......
    }

    // Producer
    // Push element and wake up a consumer.
    // ......

    thr_pos().head = ULONG_MAX;
    wake_up_interruptible_all(wq);

I left pieces of code corresponding to the queue logic as they are in the original queue implementation, but surely we should rewrite the queue in plain C if we need to run it in kernel space.

wait_event_interruptible() and wake_up_interruptible_all() are Linux kernel analogs of pthread_cond_wait(3p) and pthread_cond_broadcast(3p). The both accepts a pointer to wait queue on which consumers are sleeping. wait_event_interruptible(), which is a C macro actually, also takes the condition on which the consumers wants to sleep (i.e. it waits until the condition is true). wake_up_interruptible_all() wakes up all consuming threads, the same way as pthread_cond_broadcast() does it. We can't use more efficient wake_up_interruptible(), which wakes up only one consumer, due to the second feature of our queue - we must be sure that exactly the consumer waiting on the position, into which we just inserted an item, is woken up, but standard interface doesn't allow us to specify which thread must be woken up. So we don't know which thread to wake up and we have to wake up all the sleeping threads.

The body of while loop in consumer code is slow path, but we want the things to be fast in our lock free implementation. The situation in the queue can change quickly, so a consumer, which just checked that there is no items in the queue, can find an item at next check and we should balance between how quickly consumer can observer the queue state and how many unnecessary cache bounces it produces. Therefore I add some spinning before going to sleep:

    // Consumer
    unsigned int loop_cnt = 0;
    while (thr_pos().tail >= last_head_) {
        if (++loop_cnt < 1000) {
            schedule();
        } else {

            wait_event_interruptible(wq,
                                     thr_pos().tail
                                      < last_head_);
            loop_cnt = 0;

        // Update the last_head_.
        // .......
    }

In practise the constant for loop spinning (1000 in the code above) should be chosen based on results of performance tests. Thus, we can minimize cost of condition wait for consumers. Unfortunately, we can't reliably do the same for producers - we don't know whether there are sleeping consumers or not in reliable way (if you just put a check and call wake_up() after it then a consumer can go to sleep just after the check say "there are no sleepers"). So we must always call waking up function.

Now let's have a brief look onto wake_up_interruptible_all() and wait_event_interruptible() implementations (linux-3.11-rc3, I've thrown out some logic for code brevity):

    #define wake_up_interruptible_all(x) \
        __wake_up(x, TASK_INTERRUPTIBLE, 0, NULL)

    void __wake_up(wait_queue_head_t *q, unsigned int mode,
                   int nr_exclusive, void *key)
    {
        unsigned long flags;

        spin_lock_irqsave(&q->lock, flags);
        __wake_up_common(q, mode, nr_exclusive, 0, key);
        spin_unlock_irqrestore(&q->lock, flags);
    }

    static void __wake_up_common(wait_queue_head_t *q,
                                 unsigned int mode,
                                 int nr_exclusive,

                                 int wake_flags, void *key)
    {
        wait_queue_t *curr, *next;

        list_for_each_entry_safe(curr, next, &q->task_list,

                                 task_list)
        {
            unsigned flags = curr->flags;

            if (curr->func(curr, mode, wake_flags, key)

                && (flags & WQ_FLAG_EXCLUSIVE)
                && !--nr_exclusive)
                        break;
        }
    }



    #define wait_event_interruptible(wq, condition)          \
    ({                                                       \
        int __ret = 0;                                       \
        if (!(condition))                                    \
            __wait_event_interruptible(wq, condition, __ret); \
       __ret;                                                \
    })

    #define __wait_event_interruptible(wq, condition, ret)   \
    do {                                                     \
        DEFINE_WAIT(__wait);                                 \
        for (;;) {                                           \
            prepare_to_wait(&wq, &__wait, TASK_INTERRUPTIBLE); \
            /* .... */                                       \
        }                                                    \
        finish_wait(&wq, &__wait);                           \
    } while (0)


    void
    prepare_to_wait(wait_queue_head_t *q, wait_queue_t *wait,

                    int state)
    {

        // .....
        spin_lock_irqsave(&q->lock, flags);
        // .....
        spin_unlock_irqrestore(&q->lock, flags);
    }

    

Here we see following two nasty things:
  • wait_event_interruptible() and wake_up_interruptible_all() acquires the same spin lock;
  • wake_up_interruptible_all() walks over a list of  tasks and items of the list are likely in sparse memory regions.
Thus, going to sleep and wake up a task are not a fast operations. Moreover, as we saw, we have to wake up all the consuming tasks instead of just one and this is also sad for performance, especially on many cores. So we need to implement conditional wait for the queue with following properties:
  1. concurrent going to sleep and waking up (i.e. lock-free);
  2. wake up only the consumer which waits for the item which we just inserted into the queue;
This is the place where the 3rd and 4th features of our queue come into action. Firstly, we allocate and initialize an array of task descriptors (struct task_struct in Linux kernel or it could be pthread_t in user space) with size of number of consumers:

   
struct task_struct *w_tasks[CONSUMERS] ____cacheline_aligned;

    memset(w_tasks, 0, sizeof(w_tasks));

We'll use the array to make consumers go to sleep concurrently. The question is how to safely get an index in the array for particular consuming task? We need to know exactly which task we have to wake up when we insert an item in the queue, so the answer is simple - just get residual of division of current position in the queue by number of consumers (CONSUMERS). Due to property 4 of our queue, we can say that, using such array indexing, all consumers safely get their positions in the array without conflicts, but we'll see bit later that this is not true and we need additional steps to solve the conflicts. However, at this point we can easily write waking up code (please, read it also as pseudo-code - this is a mix of previous C++ lock-free queue implementation and Linux kernel C implementation of the same queue):

    void
    wake_up(unsigned long position)
    {
        unsigned long pos = position % CONSUMERS;
        wait_queue_t wait = { .private = w_tasks[pos] };

        if (!wait.private)
                return; 


        /*
         * Asynchronously wake up the task.
         * See linux/kernel/sched_features.h.
         */
 

        default_wake_function(&wait, TASK_INTERRUPTIBLE,
                              0, NULL);
    }

Where default_wake_function() wakes up the task passed to it as a field of wait_queue_t structure - this is standard Linux kernel API. One important thing - there is noting bad if we try to wake up already running task, so we can leave this without locking.

The things are going harder when a task goes to sleep. Following problems are possible if many consumers go to sleep and many producers wake them up concurrently:
  1. a consumer misses its waken signal due to
    1. race with a producer on insertion into the awaited position (a consumer inserts its task descriptor into the array after a producer tried to wake up corresponding consumer);
    2. race with other consumer which rewrites pointer in the array;
  2. waking up wrong consumer;
  3. false wake up;
 The 3rd issue, false wake up, is easy to overcome - just recheck the condition in a loop over conditional wait. The other two are harder to fix. Imagine that we have 4 consumers (C0, C1, C2, C3 and CONSUMERS = 4) waiting on four positions correspondingly (N0, N1, N2 and N3) and two producers (P0 and P1). The producers are concurrently inserting two items N0 and N1 correspondingly and go to wake up consumers C0 and C1. If producer P0 is slow (e.g. it was preempted) than P1, then P1 can wake up C1, C1 eats the item and ready to consume the next one. The position of the next item is P4 = P0 + 4 which gives us exactly the same position P0 in w_tasks, so C1 goes to rewrite position of C0 and P0 will wake up C1 instead of C0. Even more nasty is that C0 could be never woken up at all. Therefore, we can acquire w_tasks item only if it is surely free and CMPXCHG is helping us with this.

Also there is other race scenario which we need to prevent. A producer and a consumer goes into push() and pop() operations simultaneously:
  1. consumers checks that there is no items in the queue and goes to wait;
  2. producer pushes the item and try to wake waiting task, but finds corresponding position in w_tasks as NULL and doesn't do anything;
  3. consumer sleeps in waiting for the item, probably forever.
This is a classical scenario which is perfectly handled with double check by any conditional wait code (either POSIX threads library or Linux kernel).

So lets write our fast lock-free conditional wait code:

    #define cond_wait(position, condition)                   \
    do {                                                     \
        unsigned long p = position % CONSUMERS;              \
        struct task_struct *curr_waiter;                     \
        curr_waiter = cmpxchg(&w_tasks[p], NULL, current);   \
        if (unlikely(curr_waiter)) {                         \
            wait_queue_t wait = { .private = curr_waiter };  \
            default_wake_function(&wait, TASK_INTERRUPTIBLE, \

                                  0, NULL);                  \
                schedule();                                  \
                if (condition)                               \
                        break;                               \
                continue;                                    \
        }                                                    \
        set_current_state(TASK_INTERRUPTIBLE);               \
        if (!(signal_pending(current) || condition))         \
                schedule();                                  \
        w_tasks[p] = NULL;                                   \
        set_current_state(TASK_RUNNING);                     \
        break;                                               \
    } while (1)


Where current is pointer to current task in Linux kernel (global variable). The current task goes to sleeping state by setting its state to TASK_INTERRUPTIBLE and rescheduling (by schedule() call). When task is waked up it continues its work flow from schedule() call and sets its state as running, so it will get time slice again on next rescheduling.

Our conditional wait spins in a loop while the position on w_tasks is non-NULL (i.e. it is acquired by some other waiting thread), so there is no conflict between consumers. Hopefully, the case when two tasks are competing for the same position in the wait array is rare, so I use unlikely specification (which is equal to __builtin_expect(X, 0) GCC's extension in user space).

If a task waiting on position P faces w_tasks[P % CONSUMERS] != NULL, then it is likely that the position is acquired by a task waiting on position Q, such that Q + CONSUMERS <= P. Since we have only CONSUMERS number of consumers, then it means that position P in the queue already has an item (due to property 4). We're in a slow path anyway, so there is no problem to try to wake up the waiting task to make its wake up happen earlier. There are also a chance that Q > P, but it is less likely and there is still no problem in false wake up. Somebody can push an item to the queue during our spinning in waiting for freeing position in w_tasks, so we must check the condition at each iteration.

Finally, we perform classical double check of the condition to avoid infinite waiting and set w_tasks position to NULL at the end of waiting.

This is fast condition wait, and moreover due to reduced cache bouncing, it makes the lock-free queue ever faster than its spinning version. The kernel module which uses the lock-free queue with this condition wait algorithm has shown about 10% performance improvement in comparison with the queue without condition wait. Unfortunately, I don't have independent test code in which I can measure the performance gain for the queue itself without additional application logic.

Saturday, July 27, 2013

C++ Variadic Templates For Multiple Inheritance

C++ variadic templates take variable number of arguments. C++ also allows to create a template class which inherits from template base class. These two allows us to inherit from variable number of base classes. When it's needed? Let's have a look at simple example which I faced recently.

Suppose you need class Messenger which receives raw messages from a socket, assembles messages of particular types and passes them into appropriate queue. Frequently the message queues are implemented as template like

    template<class T>
    struct Queue {
        // some class body
    };

So you has following queues for each type of message:

    Queue<MsgA> q_a;
    Queue<MsgB> q_b;
    Queue<MsgC> q_c;

The queues have to be members of class Messenger. Probably, it is not a big deal to copy paste 3, as in the example, members. However, the uglinesses raises from necessity to have registering interfaces for each queue (classes who uses the Messenger need to register on particular queue to receive messages from it), serialized push() interfaces, probably the queue accessors and some other methods specific for the queue (accessing the queues directly as to public members isn't a good idea). C++ meta-programming could help to generate the queues automatically with all required interfaces. Let's have see how we can use it for this task.

Messenger provides interfaces to the queues, so it "is-a" QueueHandler. QueueHandler handles queue of particular type as a member and provides interfaces to it. So you should generate set of QueueHander classes for each queue type and inherit Messenger from all of the classes:

    template<class T>
    struct QueueHandler {
        void register(Queue<T> *q) { /* some method body */ }
        void push(T *msg) { /* some other method body */ }
    private:

        Queue<T> q_;
    };


It's also worse to specify explicitly all the base QueueHandler classes for Messenger class. So you can introduce helping class GenericMessenger, which template specialization is Messenger, and use C++ variadic template to write the class independent on particular number of serviced queues:

    template<class... Args>
    struct GenericMessenger : QueueHandler<Args>... {
        // some struct body
    };

    typedef GenericMessenger<MsgA, MsgB, MsgC> Messenger;


Therefore, if you need to support one more type of messages (and their queue of course), then you just need to add the type to Messenger definition and there is no copy paste code!

The only one ugly thing is that you need to specify explicitly base class on accessing particular queue (this is because GenericMessenger has many base classes with the same methods' names, so we need to explicitly call method of particular base):

    Messenger *m = new Messenger();
    

    m->QueueHandler<A>::register(new Queue<MsgA>);
    m->QueueHandler<A>::push(new MsgA);



Thursday, May 23, 2013

Lock-free Multi-producer Multi-consumer Queue on Ring Buffer

My article "Lock-Free Multi-Producer Multi-Consumer Queue on Ring Buffer" was published by Linux Journal more than 30 days ago, so now I can post it here.

Work queue has always been one of the most hot points in server software.
  Here is how to scale it effectively to multi-core environment.


I. INTRODUCTION

Nowadays high performance server software (e.g. HTTP accelerator) in most cases
runs on multi-core machines. Modern hardware could provide 32, 64 and more CPU
cores. In such highly-concurrent environment lock contention sometimes hurts
overall system performance more than data copying, context switches etc. Thus,
moving most hot data structures from locked to lock-free design can
significantly improve performance of software working in multi-core
environment.

One of the most hot data structure in traditional server software is work
queue, which could have hundreds of thousands push and pop operations per
second from tens of producers and/or consumers.

Work queue is a FIFO data structure which has only two operations: push() and
pop(). It usually limits its size such that pop() waits if there
is no elements in the queue and push() waits if the queue contains maximum
allowed number of elements. It is important that many threads can execute pop()
and push() operations simultaneously on different CPU cores.

One of the possible work queue implementations is ring buffer storing pointers
to the queued elements. It has good performance especially in comparison
with common non-intrusive linked list (which stores copies of values passed by
the user, e.g. std::list).
Significant thing about ring buffer implementation is that it natively limits
its size - you only need to move current position in round-robin fashion. From
other side linked lists require maintaining additional field for total queue
length. With linked list push and pop operations have to modify the queue
length in addition to element links updating, so you need to take more care of
consistency in the queue for lock-free implementation.

Basically different CPU families provides different guarantees for memory
operations ordering and this is critical for lock-free algorithms.
In this article we'll concentrate on x86 as most widespread architecture
rather than write generic (but slower) code.


II. NAIVE SYNCHRONIZED QUEUE

First of all lets define the interface for our queue (I'll use C++11 in the
article):

        template<class T, long Q_SIZE>
        class NaiveQueue {
        public:
            NaiveQueue();
            void push(T *x);
            T *pop();
        };

The queue will store T* pointers and has maximum size of Q_SIZE.

Lets see how the queue would looks in naive locked implementation. To develop
the queue we need an array in which we place our ring buffer. We can define
this as

        T *ptr_array_[Q_SIZE];

Two members of the class, head_ and tail_, will point to head (next position
to push an element) and tail (next item to pop) of the queue and should be
initialized to zero in the class construction. We can simplify our operations
on ring buffer by defining the counters as unsigned long. Unsigned long (which
is 64 bit in length) is large enough to handle ever millions operations per
second for thousands of years. So tail_ and head_ will be defined as:

        unsigned long head_;
        unsigned long tail_;

This way we can access the elements (the same for head_ and tail_) just by

        ptr_array_[tail_++ & Q_MASK]

Where Q_MASK is defined as

        static const unsigned long Q_MASK = Q_SIZE - 1;

To get current position in the array we can calculate a remainder of integer
division of tail_ by Q_SIZE, but rather we define Q_SIZE as a power of 2
(32768 in our case) so we can use bitwise AND between Q_MASK and tail_ which
is bit faster.

Since the operations on the queue must wait if there is no elements or the
queue is full, we need two condition variables:

        std::condition_variable cond_empty_;
        std::condition_variable cond_overflow_;

to wait on some new elements in the queue or some free space respectively.
Surely, we need a mutex to serialize our queue:

        std::mutex mtx_;

This way we can write push() and pop() in the following way:

        void push(T *x)
        {
            std::unique_lock<std::mutex> lock(mtx_);

            cond_overflow_.wait(lock, [&head_, &tail_]() {
                            return tail_ + Q_SIZE > head_;
                    });

            ptr_array_[head_++ & Q_MASK] = x;

            cond_empty_.notify_one();
        }

        T *pop()
        {
            std::unique_lock<std::mutex> lock(mtx_);

            cond_empty_.wait(lock, [&head_, &tail_]() {
                            return tail_ < head_;
                    });

            T *x = ptr_array_[tail_++ & Q_MASK];

            cond_overflow_.notify_one();

            return x;
        }

We perform both the operations under acquired exclusive lock using mtx_. When
the lock acquired we can check current queue state: whether it is empty (and
we can not pop any new element) or full (can not push a new element).
std::condition_variable::wait() moves the current thread to sleep state until
the specified predicate is true. Next we push or pop an element and notify
other thread (by notify_one() call) that we have changed the queue state.
Since we add or delete only one element at a time, then only one thread waiting
for a available elements or free slots in the queue can make progress, so we
notify and wake up only one thread.

The problem with the implementation is that only one thread at single point of
time can modify the queue. Moreover mutexes and condition variables
are expensive - in Linux they are implemented by futex(2) system call.
So each time when a thread needs to wait on a mutex or condition variable,
that leads to call futex(2) which reschedule the thread and moves it to wait
queue.

Now lets run plain test which just push and pop addresses to and from the
queue in 16 producers and 16 consumers (please refer end of the article for
link to full source code). On a box with 16 Xeon cores the test took about 7
minutes:

        # time ./a.out

        real    6m59.219s
        user    6m21.515s
        sys     72m34.177s

And strace with -c and -f options shows that 99.98% of time the program spends
in futex system call.


III. LOCK-FREE MULTI-PRODUCER MULTI-CONSUMER QUEUE

Hopefully you do not have to ask kernel for help with user space threads
synchronization. CPU (at least the most known architectures) provide atomic
memory operations and barriers. With the operations you can atomically

 * read memory operand, modify it and write back
 * read memory operand, compare it with a value and swap with other value

Memory barriers are special assembly instructions also known as fences.
Fences guarantee instructions execution order on local CPU and visibility
order on other CPUs. Lets consider two independent by data instructions, A
and B, separated by fence (let it be mfence which provides guarantee for
ordering of read and write operations):

 A
 mfence
 B

The fence guaranties that:
1. compiler optimizations won't move A after the fence or B before the fence;
2. CPU will execute A and B instructions in-order (event it normally executes
   instructions out-of-order);
3. other CPU cores and processor packages, which work on the same bus, will
   see result of instruction A before result of instruction B.

For our queue we need to synchronize multiple threads access to head_ and
tail_ fields. Actually, when you run head_++ (this is an example of RMW,
Read-Modify-Write, operation since processor must read current head_ value,
increment it locally and write back to memory) on two cores, then both the cores
could simultaneously read current head_ value, increment it and simultaneously
write the new value back, so one increment is lost. For atomic operations
C++11 provides std::atomic template which should replace current GCC sync_
intrinsics in future. Unfortunately, for my compiler (GCC 4.6.3 for x86-64)
std::atomic<> methods still generate extra fences independently on specified
memory order. So I'll use old GCC's intrinsics for atomic operations.

We can atomically read and increment a variable (e.g. our head_) by

        __sync_fetch_and_add(&head_, 1);

This makes CPU to lock the shared memory location on which it's going to do an
operation (increment in our case). In multiprocessor environment processors
communicate to each other to ensure that they all see relevant data. This is
known as cache coherency protocol. By this protocol processor can take
exclusive ownership on a memory location. However these communications are not
for free and we should use such atomic operations carefully and only when
we really need them. Otherwise we can hurt performance significantly.

Meanwhile plain read and write operations on memory locations execute
atomically and do not require any additional actions (like specifying 'lock'
prefix to make the instruction run atomically on x86 architecture).

In our lock-free implementation we're going to abandon mutex mtx_ and
consequently both the condition variable. However we still need to wait if the
queue is full on push and if the queue is empty on pop operations. For push we
would do this by simple loop like we did it for locked queue:

        while (tail_ + Q_SIZE < head_)
            sched_yield();

sched_yield() just lets other thread to run on current processor. This is
native and fastest way to reschedule current thread. However if there is no
other thread which is waiting in scheduler run queue for available CPU, then
current thread will be immediately scheduled back. Thus we'll see always 100%
CPU usage, ever if we have no data to process. To cope with this we can use
usleep(3) with some small value.

Lets see more carefully what's going on in the loop. Firstly we read tail_ value,
next we read value of head_ and after that we make a decision whether to wait
or push an element and move head_ forward. Current thread can schedule at any
place of the check and ever after the check. Lets consider 2 threads scenario:

        Thread 1                  Thread 2

        read tail_                read tail_
        read head_                read head_
        (scheduled)               push an element
        push an element

If we had only one free place in the ring buffer, then we override pointer to
oldest queued element. We can solve the problem by incrementing the shared
head_ before the loop and use temporal local variable (i.e. we reserve a place
to which we're going to insert an element and wait when it is free):

        unsigned long tmp_head =
            __sync_fetch_and_add(&head_, 1);
        while (tail_ + Q_SIZE < tmp_head)
            sched_yield();
        ptr_array_[tmp_head & Q_MASK] = x;

We can write similar code for pop() (just swap head and tail). However the
problem still exists. Two producers can increment head_, check that they have
enough space and reschedule at the same time just before inserting x. A
consumer can wake up instantly (it sees that head_ moved forward to two
positions) and read a value from the queue which was not inserted yet.

Before solving the issue lets see which picture we have in 2 producers (P1 and
P2) and 2 consumers (C1 and C2) case:

                     LT                          LH
        | _ | _ | _ | x | x | x | x | x | x | x | _ | _ | _ |
                      ^   ^                       ^   ^
                      |   |                       |   |
                      C1  C2                      P1  P2

On the picture '_' denotes free slots and 'x' denotes inserted elements. At
the picture C1 and C2 are going to read values and P1 and P2 are going to
write an elements to currently free slots. Let LT be a latest (lowest) tail
value among all the consumers, which is stored in tmp_tail of latest consumer,
C1 on the picture. Consumer C1 currently can work on the queue at LT position
(i.e. it is at the middle of fetching the element). And let LH correspondingly
be lowest value of tmp_head among all the producers. At each given time you can
not push an element to position equal or greater than LT and should not try to
pop an element at position equal or greater than LH. It means that all the
producers should care about current LT value and all consumers about current
LH value. So lets introduce the two helping class members for LH and LT:

        volatile unsigned long last_head_;
        volatile unsigned long last_tail_;

Thus we should check for last_tail_ value instead of tail_ in the loop above.
We need to update the values from multiple threads, but we're going to do this
by plain write operations, without RMW. So the members do not have to be of
atomic type. I just specified the variables as volatile to prevent their values
caching in local processor registers.

Now the question is who and when should update last_head_ and last_tail_
values. We do expect that in most cases we are able to perform push and/or pop
operation on the queue without a wait. Thus we can update the two helping
variables only when we really need them, i.e. inside the waiting loop.
So when a producer realizes that it can not insert a new element because of too
small last_tail_ value it falls into the wait loop and try to update last_tail_
value. To update the value the thread must inspect current tmp_tail of each
consumer. So we need to make the temporal value visible to other threads.
One of the possible solutions is to maintain an array of tmp_tail and tmp_head
values with size equal to number of running threads. We can do this with
following code:

        struct ThrPos {
            volatile unsigned long head, tail;
        };

        ThrPos thr_p_[std::max(n_consumers_, n_producers_)];

where n_consumers_ is the number of consumers and n_producers_ is the number of
producers. We can allocate the array dynamically, but leave it statically sized
for simplicity for now. Many threads read the elements of the array, but only one
thread with plain move instruction (no RMW operation) can update them,
so you also can use regular reads on the variables.

Since thr_p_ values are used to only limit moving of current queue pointers,
then we initialize them to maximum allowed values, i.e. do not limit head_ and
tail_ movings until somebody push or pop into the queue.

We can find the lowest tail values for all the consumers by following loop:

        auto min = tail_;
        for (size_t i = 0; i < n_consumers_; ++i) {
            auto tmp_t = thr_p_[i].tail;

            asm volatile("" ::: "memory"); // compiler barrier

            if (tmp_t < min)
                min = tmp_t;
        l}

The temporal variable tmp_t is required here since you can not atomically
compare whether thr_p_[i].tail is less than min and update min if it is. When
you remember current consumer's tail and compare it with min, the consumer can
move the tail. It can move it only forward, so the check in the while
condition is still correct and you won't overwrite some live queue elements.
But if you wouldn't use tmp_t and write the code like

        if (thr_p_[i].tail < min)
            min = thr_p_[i].tail;

Then the consumer can has lower tail value while you're comparing it with min,
but move it far forward after the comparison is done and just before the
assignment. So you probably find incorrect minimal value.

I added compiler barrier, asm volatile("" ::: "memory") (this is GCC specific
compiler barrier), to be sure that compiler won't move thr_p_[i].tail access
and will access the memory location only once - to load its value to tmp_t.

One important thing about the array is that it must be indexed by current thread
identifier. Since POSIX threads (and consequently C++ threads which uses
them) do not use small monotonically increasing values for threads identifying,
then we need to use our own thread wrapping. I'll use inline thr_pos() method of
the queue to access the array elements:

        ThrPos& thr_pos() const
        {
            return thr_p_[ThrId()];
        }

(you can find example of ThrId() implementation in the source referenced at
the end of the article).

Before writing the final implementation of push() and pop() lets back to
initial application of our queue, work queue. Usually, producers and consumers
do a lot of work between operations with the queue. For instance, it could be
very slow IO operation. So what happens if one consumer fetch an element from
the queue and go to sleep in long IO operation? Its tail value will be stay the
same for long time and all the producers will wait on it ever all the other
consumers fully cleared the queue. This is not desired behavior.

Lets fix this by two steps. First, lets assign to per-thread tail pointer
maximum allowed value just after the fetching the element. So we should write
following at the end of pop() method:

        T *ret = ptr_array_[thr_pos().tail & Q_MASK];
        thr_pos().tail = ULONG_MAX;
        return ret;

Since a producer in push() starts to find minimal allowed value for last_tail_
from current value of global tail_, then it can assign current tail_ value
to last_tail_ only if there is no any active consumers. This is what we wish.

Generally speaking, other processors can see thr_pos().tail update before
local processor reads from ptr_array_, so they can move and overwrite the
position in the array before local processor reads it. This is possible on
processors with relaxed memory operation ordering. However x86 provides
relatively strict memory ordering rules, particularly it guarantees that
1. stores are not reordered with earlier loads
2. and stores are seen in consistent order by other processors.
Thus, loading from ptr_array_ and storing to thr_pos().tail in the code above
will be done on x86 and seen by all processors in exactly this order.
So we don't need any explicit memory barriers here.

The second step which we need to do is correctly set thr_pos().tail at the
beginning of pop(). We assign current thr_pos().tail by

        thr_pos().tail = __sync_fetch_and_add(&tail_, 1);

The problem is that the operation is atomic only for tail_ shift, but not for
thr_pos().tail assignment. So there is a time window in which
thr_pos().tail = ULONG_MAX, and tail_ could be shifted significantly by other
consumers, so push() will set last_tail_ to current, just incremented, tail_.
So when we're are going to pop an element we have to reserve a tail position
less or equal to tail_ value which we'll pop an element from:

        thr_pos().tail = tail_;
        thr_pos().tail = __sync_fetch_and_add(&tail_, 1);

In this code we actually perform following three operations:

        write tail_ to thr_pos().tail
        increment tail_
        write previous value of tail_ to thr_pos().tail

Again, in general case we have no guarantee that other processors will "see"
results of the write operations in the same order. Potentially some other
processor can firstly read incremented tail_ value, try to find new last_tail_
and only after that read new current thread tail value. However,
__sync_fetch_and_add() executes locked instruction which implies implicit full
memory barrier on most architectures (including x86), so neither first nor third
operations can not be moved over the second one. Therefore we also can skip
explicit memory barriers here.

Thus if the queue is almost full then all producers will stop at or before
the position of element which we're popping.

Now we're are ready to write our final implementation of push() and pop()
methods. Here they are:

        void push(T *ptr)
        {
            thr_pos().head = head_;
            thr_pos().head = __sync_fetch_and_add(&head_, 1);

            while (__builtin_expect(thr_pos().head >=
                                    last_tail_ + Q_SIZE, 0))
            {
                ::sched_yield();

                auto min = tail_;
                for (size_t i = 0; i < n_consumers_; ++i) {
                    auto tmp_t = thr_p_[i].tail;

                    asm volatile("" ::: "memory"); // compiler barrier

                    if (tmp_t < min)
                        min = tmp_t;
                }
                last_tail_ = min;
            }

            ptr_array_[thr_pos().head & Q_MASK] = ptr;
            thr_pos().head = ULONG_MAX;
        }

        T *pop()
        {
            thr_pos().tail = tail_;
            thr_pos().tail = __sync_fetch_and_add(&tail_, 1);

            while (__builtin_expect(thr_pos().tail >=
                                    last_head_, 0))
            {
                ::sched_yield();

                auto min = head_;
                for (size_t i = 0; i < n_producers_; ++i) {
                    auto tmp_h = thr_p_[i].head;

                    asm volatile("" ::: "memory"); // compiler barrier

                    if (tmp_h < min)
                        min = tmp_h;
                }
                last_head_ = min;
            }

            T *ret = ptr_array_[thr_pos().tail & Q_MASK];
            thr_pos().tail = ULONG_MAX;
            return ret;
        }

Careful reader can notice that multiple threads can scan current head or tail
values over all the producing or consuming threads. So number of threads can
find different min values and try to write them to last_head_ or last_tail_
simultaneously, so probably you would use CAS operation here. However atomic
CAS is expensive and worst that can happen there is that you assign too small
value to last_head_ or last_tail_. Or ever overwrite new higher value with a
smaller old value, so you'll fall into sched_yield() again. Maybe we fall to
sched_yield() more frequently than if we use synchronized CAS operation,
but in practice the cost of extra atomic operation reduces performance.

Also I used __builtin_expect with zero expect argument to say that we do not
expect that the condition in while statement becomes true too frequently and
compiler should move the inner loop code after the code executed if the
condition is false. This way you can improve instruction cache usage.

Finally lets run the same test as for naive queue:

        # time ./a.out 

        real    1m53.566s
        user    27m55.784s
        sys     2m4.461s

This is 3.7 times faster than our naive queue implementation!


IV. CONCLUSION

Nowadays, high performance computing is typically achieved by two ways:
horizontal scaling (scale-out) by adding new computational nodes and
vertical scaling (scale-up) by adding extra computational resources (like
CPUs or memory) to a single node. Unfortunatelly, linear scaling is possible
only in theory. In practice if you double your computational resources, then
it is likely that you get only 30-60% performance gain. Lock contention is one
of the problems which prevents efficient scale-up by adding extra CPUs.
Lock-free algorigthms makes scale-up more productive and allows to get more
performance in multi-core environments.

The code for naive and lock-free queue implementations with the tests for
correctness is available at:

  https://github.com/krizhanovsky/NatSys-Lab/blob/master/lockfree_rb_q.cc

Alexander Krizhanovsky is the software architect and founder of NatSys-Lab.
Before NatSys-Lab he was working as Senior Software Developer at IBM, Yandex
and Parallels. He specializes in high performance solutions for UNIX
environment.

Special thanks to Johann George from SanDisk for final review of the paper.