High Performance Linux

Monday, August 12, 2013

Lock-free Condition Wait for Lock-free Multi-producer Multi-consumer Queue

The lock-free multi-producer multi-consumer queue on ring buffer described in my previous post has following properties:
  1. in pop() the queue calls sched_yeild() which leads to 100% CPU consumption;
  2. consumers are waiting for particular position in the queue, i.e. if you put an item to the queue with all sleeping consumers then one and only one consumer can eat it;
  3. the queue has fixed number of consumers and producers;
  4. say we have 4 consumers and there are no available elements in the queue, then all consumers will wait on 4 consequent positions (x, x + 1, x + 2, x +3);
The first one is undesired due to too high power consumption and misleading  system statistics. To cope with this we can use usleep(3), but it looks like dirty and relatively expensive solution. It would be better to make the pop()'er to wait until a push()'er adds an item to the queue. In the naive queue implementation I used conditional variable to do this. Also, the spinning behaviour of the queue leads to unnecessary cache bouncing and degrades performance.

In this post I'm going to show an efficient way for condition wait. The original article about lock-free queue has used C++11 for the code implementation, however in this article I'll be mostly talking about Linux kernel because the algorithm was developed for kernel implementation of the queue. I'll be explaining all kernel specific things, so no special skills are required from a reader.

If you need to make consuming thread to go to sleep when there are no items in the queue, then probably you write code like following (this is C-like pseudo-code):

    // Consumer
    while (thr_pos().tail >= last_head_) {
                                 thr_pos().tail < last_head_);

        // Update the last_head_.
        // .......

    // Producer
    // Push element and wake up a consumer.
    // ......

    thr_pos().head = ULONG_MAX;

I left pieces of code corresponding to the queue logic as they are in the original queue implementation, but surely we should rewrite the queue in plain C if we need to run it in kernel space.

wait_event_interruptible() and wake_up_interruptible_all() are Linux kernel analogs of pthread_cond_wait(3p) and pthread_cond_broadcast(3p). The both accepts a pointer to wait queue on which consumers are sleeping. wait_event_interruptible(), which is a C macro actually, also takes the condition on which the consumers wants to sleep (i.e. it waits until the condition is true). wake_up_interruptible_all() wakes up all consuming threads, the same way as pthread_cond_broadcast() does it. We can't use more efficient wake_up_interruptible(), which wakes up only one consumer, due to the second feature of our queue - we must be sure that exactly the consumer waiting on the position, into which we just inserted an item, is woken up, but standard interface doesn't allow us to specify which thread must be woken up. So we don't know which thread to wake up and we have to wake up all the sleeping threads.

The body of while loop in consumer code is slow path, but we want the things to be fast in our lock free implementation. The situation in the queue can change quickly, so a consumer, which just checked that there is no items in the queue, can find an item at next check and we should balance between how quickly consumer can observer the queue state and how many unnecessary cache bounces it produces. Therefore I add some spinning before going to sleep:

    // Consumer
    unsigned int loop_cnt = 0;
    while (thr_pos().tail >= last_head_) {
        if (++loop_cnt < 1000) {
        } else {

                                      < last_head_);
            loop_cnt = 0;

        // Update the last_head_.
        // .......

In practise the constant for loop spinning (1000 in the code above) should be chosen based on results of performance tests. Thus, we can minimize cost of condition wait for consumers. Unfortunately, we can't reliably do the same for producers - we don't know whether there are sleeping consumers or not in reliable way (if you just put a check and call wake_up() after it then a consumer can go to sleep just after the check say "there are no sleepers"). So we must always call waking up function.

Now let's have a brief look onto wake_up_interruptible_all() and wait_event_interruptible() implementations (linux-3.11-rc3, I've thrown out some logic for code brevity):

    #define wake_up_interruptible_all(x) \
        __wake_up(x, TASK_INTERRUPTIBLE, 0, NULL)

    void __wake_up(wait_queue_head_t *q, unsigned int mode,
                   int nr_exclusive, void *key)
        unsigned long flags;

        spin_lock_irqsave(&q->lock, flags);
        __wake_up_common(q, mode, nr_exclusive, 0, key);
        spin_unlock_irqrestore(&q->lock, flags);

    static void __wake_up_common(wait_queue_head_t *q,
                                 unsigned int mode,
                                 int nr_exclusive,

                                 int wake_flags, void *key)
        wait_queue_t *curr, *next;

        list_for_each_entry_safe(curr, next, &q->task_list,

            unsigned flags = curr->flags;

            if (curr->func(curr, mode, wake_flags, key)

                && (flags & WQ_FLAG_EXCLUSIVE)
                && !--nr_exclusive)

    #define wait_event_interruptible(wq, condition)          \
    ({                                                       \
        int __ret = 0;                                       \
        if (!(condition))                                    \
            __wait_event_interruptible(wq, condition, __ret); \
       __ret;                                                \

    #define __wait_event_interruptible(wq, condition, ret)   \
    do {                                                     \
        DEFINE_WAIT(__wait);                                 \
        for (;;) {                                           \
            prepare_to_wait(&wq, &__wait, TASK_INTERRUPTIBLE); \
            /* .... */                                       \
        }                                                    \
        finish_wait(&wq, &__wait);                           \
    } while (0)

    prepare_to_wait(wait_queue_head_t *q, wait_queue_t *wait,

                    int state)

        // .....
        spin_lock_irqsave(&q->lock, flags);
        // .....
        spin_unlock_irqrestore(&q->lock, flags);


Here we see following two nasty things:
  • wait_event_interruptible() and wake_up_interruptible_all() acquires the same spin lock;
  • wake_up_interruptible_all() walks over a list of  tasks and items of the list are likely in sparse memory regions.
Thus, going to sleep and wake up a task are not a fast operations. Moreover, as we saw, we have to wake up all the consuming tasks instead of just one and this is also sad for performance, especially on many cores. So we need to implement conditional wait for the queue with following properties:
  1. concurrent going to sleep and waking up (i.e. lock-free);
  2. wake up only the consumer which waits for the item which we just inserted into the queue;
This is the place where the 3rd and 4th features of our queue come into action. Firstly, we allocate and initialize an array of task descriptors (struct task_struct in Linux kernel or it could be pthread_t in user space) with size of number of consumers:

struct task_struct *w_tasks[CONSUMERS] ____cacheline_aligned;

    memset(w_tasks, 0, sizeof(w_tasks));

We'll use the array to make consumers go to sleep concurrently. The question is how to safely get an index in the array for particular consuming task? We need to know exactly which task we have to wake up when we insert an item in the queue, so the answer is simple - just get residual of division of current position in the queue by number of consumers (CONSUMERS). Due to property 4 of our queue, we can say that, using such array indexing, all consumers safely get their positions in the array without conflicts, but we'll see bit later that this is not true and we need additional steps to solve the conflicts. However, at this point we can easily write waking up code (please, read it also as pseudo-code - this is a mix of previous C++ lock-free queue implementation and Linux kernel C implementation of the same queue):

    wake_up(unsigned long position)
        unsigned long pos = position % CONSUMERS;
        wait_queue_t wait = { .private = w_tasks[pos] };

        if (!wait.private)

         * Asynchronously wake up the task.
         * See linux/kernel/sched_features.h.

        default_wake_function(&wait, TASK_INTERRUPTIBLE,
                              0, NULL);

Where default_wake_function() wakes up the task passed to it as a field of wait_queue_t structure - this is standard Linux kernel API. One important thing - there is noting bad if we try to wake up already running task, so we can leave this without locking.

The things are going harder when a task goes to sleep. Following problems are possible if many consumers go to sleep and many producers wake them up concurrently:
  1. a consumer misses its waken signal due to
    1. race with a producer on insertion into the awaited position (a consumer inserts its task descriptor into the array after a producer tried to wake up corresponding consumer);
    2. race with other consumer which rewrites pointer in the array;
  2. waking up wrong consumer;
  3. false wake up;
 The 3rd issue, false wake up, is easy to overcome - just recheck the condition in a loop over conditional wait. The other two are harder to fix. Imagine that we have 4 consumers (C0, C1, C2, C3 and CONSUMERS = 4) waiting on four positions correspondingly (N0, N1, N2 and N3) and two producers (P0 and P1). The producers are concurrently inserting two items N0 and N1 correspondingly and go to wake up consumers C0 and C1. If producer P0 is slow (e.g. it was preempted) than P1, then P1 can wake up C1, C1 eats the item and ready to consume the next one. The position of the next item is P4 = P0 + 4 which gives us exactly the same position P0 in w_tasks, so C1 goes to rewrite position of C0 and P0 will wake up C1 instead of C0. Even more nasty is that C0 could be never woken up at all. Therefore, we can acquire w_tasks item only if it is surely free and CMPXCHG is helping us with this.

Also there is other race scenario which we need to prevent. A producer and a consumer goes into push() and pop() operations simultaneously:
  1. consumers checks that there is no items in the queue and goes to wait;
  2. producer pushes the item and try to wake waiting task, but finds corresponding position in w_tasks as NULL and doesn't do anything;
  3. consumer sleeps in waiting for the item, probably forever.
This is a classical scenario which is perfectly handled with double check by any conditional wait code (either POSIX threads library or Linux kernel).

So lets write our fast lock-free conditional wait code:

    #define cond_wait(position, condition)                   \
    do {                                                     \
        unsigned long p = position % CONSUMERS;              \
        struct task_struct *curr_waiter;                     \
        curr_waiter = cmpxchg(&w_tasks[p], NULL, current);   \
        if (unlikely(curr_waiter)) {                         \
            wait_queue_t wait = { .private = curr_waiter };  \
            default_wake_function(&wait, TASK_INTERRUPTIBLE, \

                                  0, NULL);                  \
                schedule();                                  \
                if (condition)                               \
                        break;                               \
                continue;                                    \
        }                                                    \
        set_current_state(TASK_INTERRUPTIBLE);               \
        if (!(signal_pending(current) || condition))         \
                schedule();                                  \
        w_tasks[p] = NULL;                                   \
        set_current_state(TASK_RUNNING);                     \
        break;                                               \
    } while (1)

Where current is pointer to current task in Linux kernel (global variable). The current task goes to sleeping state by setting its state to TASK_INTERRUPTIBLE and rescheduling (by schedule() call). When task is waked up it continues its work flow from schedule() call and sets its state as running, so it will get time slice again on next rescheduling.

Our conditional wait spins in a loop while the position on w_tasks is non-NULL (i.e. it is acquired by some other waiting thread), so there is no conflict between consumers. Hopefully, the case when two tasks are competing for the same position in the wait array is rare, so I use unlikely specification (which is equal to __builtin_expect(X, 0) GCC's extension in user space).

If a task waiting on position P faces w_tasks[P % CONSUMERS] != NULL, then it is likely that the position is acquired by a task waiting on position Q, such that Q + CONSUMERS <= P. Since we have only CONSUMERS number of consumers, then it means that position P in the queue already has an item (due to property 4). We're in a slow path anyway, so there is no problem to try to wake up the waiting task to make its wake up happen earlier. There are also a chance that Q > P, but it is less likely and there is still no problem in false wake up. Somebody can push an item to the queue during our spinning in waiting for freeing position in w_tasks, so we must check the condition at each iteration.

Finally, we perform classical double check of the condition to avoid infinite waiting and set w_tasks position to NULL at the end of waiting.

This is fast condition wait, and moreover due to reduced cache bouncing, it makes the lock-free queue ever faster than its spinning version. The kernel module which uses the lock-free queue with this condition wait algorithm has shown about 10% performance improvement in comparison with the queue without condition wait. Unfortunately, I don't have independent test code in which I can measure the performance gain for the queue itself without additional application logic.