High Performance Linux

Monday, August 12, 2013

Lock-free Condition Wait for Lock-free Multi-producer Multi-consumer Queue

The lock-free multi-producer multi-consumer queue on ring buffer described in my previous post has following properties:
  1. in pop() the queue calls sched_yeild() which leads to 100% CPU consumption;
  2. consumers are waiting for particular position in the queue, i.e. if you put an item to the queue with all sleeping consumers then one and only one consumer can eat it;
  3. the queue has fixed number of consumers and producers;
  4. say we have 4 consumers and there are no available elements in the queue, then all consumers will wait on 4 consequent positions (x, x + 1, x + 2, x +3);
The first one is undesired due to too high power consumption and misleading  system statistics. To cope with this we can use usleep(3), but it looks like dirty and relatively expensive solution. It would be better to make the pop()'er to wait until a push()'er adds an item to the queue. In the naive queue implementation I used conditional variable to do this. Also, the spinning behaviour of the queue leads to unnecessary cache bouncing and degrades performance.

In this post I'm going to show an efficient way for condition wait. The original article about lock-free queue has used C++11 for the code implementation, however in this article I'll be mostly talking about Linux kernel because the algorithm was developed for kernel implementation of the queue. I'll be explaining all kernel specific things, so no special skills are required from a reader.

If you need to make consuming thread to go to sleep when there are no items in the queue, then probably you write code like following (this is C-like pseudo-code):

    // Consumer
    while (thr_pos().tail >= last_head_) {
        wait_event_interruptible(wq,
                                 thr_pos().tail < last_head_);

        // Update the last_head_.
        // .......
    }

    // Producer
    // Push element and wake up a consumer.
    // ......

    thr_pos().head = ULONG_MAX;
    wake_up_interruptible_all(wq);

I left pieces of code corresponding to the queue logic as they are in the original queue implementation, but surely we should rewrite the queue in plain C if we need to run it in kernel space.

wait_event_interruptible() and wake_up_interruptible_all() are Linux kernel analogs of pthread_cond_wait(3p) and pthread_cond_broadcast(3p). The both accepts a pointer to wait queue on which consumers are sleeping. wait_event_interruptible(), which is a C macro actually, also takes the condition on which the consumers wants to sleep (i.e. it waits until the condition is true). wake_up_interruptible_all() wakes up all consuming threads, the same way as pthread_cond_broadcast() does it. We can't use more efficient wake_up_interruptible(), which wakes up only one consumer, due to the second feature of our queue - we must be sure that exactly the consumer waiting on the position, into which we just inserted an item, is woken up, but standard interface doesn't allow us to specify which thread must be woken up. So we don't know which thread to wake up and we have to wake up all the sleeping threads.

The body of while loop in consumer code is slow path, but we want the things to be fast in our lock free implementation. The situation in the queue can change quickly, so a consumer, which just checked that there is no items in the queue, can find an item at next check and we should balance between how quickly consumer can observer the queue state and how many unnecessary cache bounces it produces. Therefore I add some spinning before going to sleep:

    // Consumer
    unsigned int loop_cnt = 0;
    while (thr_pos().tail >= last_head_) {
        if (++loop_cnt < 1000) {
            schedule();
        } else {

            wait_event_interruptible(wq,
                                     thr_pos().tail
                                      < last_head_);
            loop_cnt = 0;

        // Update the last_head_.
        // .......
    }

In practise the constant for loop spinning (1000 in the code above) should be chosen based on results of performance tests. Thus, we can minimize cost of condition wait for consumers. Unfortunately, we can't reliably do the same for producers - we don't know whether there are sleeping consumers or not in reliable way (if you just put a check and call wake_up() after it then a consumer can go to sleep just after the check say "there are no sleepers"). So we must always call waking up function.

Now let's have a brief look onto wake_up_interruptible_all() and wait_event_interruptible() implementations (linux-3.11-rc3, I've thrown out some logic for code brevity):

    #define wake_up_interruptible_all(x) \
        __wake_up(x, TASK_INTERRUPTIBLE, 0, NULL)

    void __wake_up(wait_queue_head_t *q, unsigned int mode,
                   int nr_exclusive, void *key)
    {
        unsigned long flags;

        spin_lock_irqsave(&q->lock, flags);
        __wake_up_common(q, mode, nr_exclusive, 0, key);
        spin_unlock_irqrestore(&q->lock, flags);
    }

    static void __wake_up_common(wait_queue_head_t *q,
                                 unsigned int mode,
                                 int nr_exclusive,

                                 int wake_flags, void *key)
    {
        wait_queue_t *curr, *next;

        list_for_each_entry_safe(curr, next, &q->task_list,

                                 task_list)
        {
            unsigned flags = curr->flags;

            if (curr->func(curr, mode, wake_flags, key)

                && (flags & WQ_FLAG_EXCLUSIVE)
                && !--nr_exclusive)
                        break;
        }
    }



    #define wait_event_interruptible(wq, condition)          \
    ({                                                       \
        int __ret = 0;                                       \
        if (!(condition))                                    \
            __wait_event_interruptible(wq, condition, __ret); \
       __ret;                                                \
    })

    #define __wait_event_interruptible(wq, condition, ret)   \
    do {                                                     \
        DEFINE_WAIT(__wait);                                 \
        for (;;) {                                           \
            prepare_to_wait(&wq, &__wait, TASK_INTERRUPTIBLE); \
            /* .... */                                       \
        }                                                    \
        finish_wait(&wq, &__wait);                           \
    } while (0)


    void
    prepare_to_wait(wait_queue_head_t *q, wait_queue_t *wait,

                    int state)
    {

        // .....
        spin_lock_irqsave(&q->lock, flags);
        // .....
        spin_unlock_irqrestore(&q->lock, flags);
    }

    

Here we see following two nasty things:
  • wait_event_interruptible() and wake_up_interruptible_all() acquires the same spin lock;
  • wake_up_interruptible_all() walks over a list of  tasks and items of the list are likely in sparse memory regions.
Thus, going to sleep and wake up a task are not a fast operations. Moreover, as we saw, we have to wake up all the consuming tasks instead of just one and this is also sad for performance, especially on many cores. So we need to implement conditional wait for the queue with following properties:
  1. concurrent going to sleep and waking up (i.e. lock-free);
  2. wake up only the consumer which waits for the item which we just inserted into the queue;
This is the place where the 3rd and 4th features of our queue come into action. Firstly, we allocate and initialize an array of task descriptors (struct task_struct in Linux kernel or it could be pthread_t in user space) with size of number of consumers:

   
struct task_struct *w_tasks[CONSUMERS] ____cacheline_aligned;

    memset(w_tasks, 0, sizeof(w_tasks));

We'll use the array to make consumers go to sleep concurrently. The question is how to safely get an index in the array for particular consuming task? We need to know exactly which task we have to wake up when we insert an item in the queue, so the answer is simple - just get residual of division of current position in the queue by number of consumers (CONSUMERS). Due to property 4 of our queue, we can say that, using such array indexing, all consumers safely get their positions in the array without conflicts, but we'll see bit later that this is not true and we need additional steps to solve the conflicts. However, at this point we can easily write waking up code (please, read it also as pseudo-code - this is a mix of previous C++ lock-free queue implementation and Linux kernel C implementation of the same queue):

    void
    wake_up(unsigned long position)
    {
        unsigned long pos = position % CONSUMERS;
        wait_queue_t wait = { .private = w_tasks[pos] };

        if (!wait.private)
                return; 


        /*
         * Asynchronously wake up the task.
         * See linux/kernel/sched_features.h.
         */
 

        default_wake_function(&wait, TASK_INTERRUPTIBLE,
                              0, NULL);
    }

Where default_wake_function() wakes up the task passed to it as a field of wait_queue_t structure - this is standard Linux kernel API. One important thing - there is noting bad if we try to wake up already running task, so we can leave this without locking.

The things are going harder when a task goes to sleep. Following problems are possible if many consumers go to sleep and many producers wake them up concurrently:
  1. a consumer misses its waken signal due to
    1. race with a producer on insertion into the awaited position (a consumer inserts its task descriptor into the array after a producer tried to wake up corresponding consumer);
    2. race with other consumer which rewrites pointer in the array;
  2. waking up wrong consumer;
  3. false wake up;
 The 3rd issue, false wake up, is easy to overcome - just recheck the condition in a loop over conditional wait. The other two are harder to fix. Imagine that we have 4 consumers (C0, C1, C2, C3 and CONSUMERS = 4) waiting on four positions correspondingly (N0, N1, N2 and N3) and two producers (P0 and P1). The producers are concurrently inserting two items N0 and N1 correspondingly and go to wake up consumers C0 and C1. If producer P0 is slow (e.g. it was preempted) than P1, then P1 can wake up C1, C1 eats the item and ready to consume the next one. The position of the next item is P4 = P0 + 4 which gives us exactly the same position P0 in w_tasks, so C1 goes to rewrite position of C0 and P0 will wake up C1 instead of C0. Even more nasty is that C0 could be never woken up at all. Therefore, we can acquire w_tasks item only if it is surely free and CMPXCHG is helping us with this.

Also there is other race scenario which we need to prevent. A producer and a consumer goes into push() and pop() operations simultaneously:
  1. consumers checks that there is no items in the queue and goes to wait;
  2. producer pushes the item and try to wake waiting task, but finds corresponding position in w_tasks as NULL and doesn't do anything;
  3. consumer sleeps in waiting for the item, probably forever.
This is a classical scenario which is perfectly handled with double check by any conditional wait code (either POSIX threads library or Linux kernel).

So lets write our fast lock-free conditional wait code:

    #define cond_wait(position, condition)                   \
    do {                                                     \
        unsigned long p = position % CONSUMERS;              \
        struct task_struct *curr_waiter;                     \
        curr_waiter = cmpxchg(&w_tasks[p], NULL, current);   \
        if (unlikely(curr_waiter)) {                         \
            wait_queue_t wait = { .private = curr_waiter };  \
            default_wake_function(&wait, TASK_INTERRUPTIBLE, \

                                  0, NULL);                  \
                schedule();                                  \
                if (condition)                               \
                        break;                               \
                continue;                                    \
        }                                                    \
        set_current_state(TASK_INTERRUPTIBLE);               \
        if (!(signal_pending(current) || condition))         \
                schedule();                                  \
        w_tasks[p] = NULL;                                   \
        set_current_state(TASK_RUNNING);                     \
        break;                                               \
    } while (1)


Where current is pointer to current task in Linux kernel (global variable). The current task goes to sleeping state by setting its state to TASK_INTERRUPTIBLE and rescheduling (by schedule() call). When task is waked up it continues its work flow from schedule() call and sets its state as running, so it will get time slice again on next rescheduling.

Our conditional wait spins in a loop while the position on w_tasks is non-NULL (i.e. it is acquired by some other waiting thread), so there is no conflict between consumers. Hopefully, the case when two tasks are competing for the same position in the wait array is rare, so I use unlikely specification (which is equal to __builtin_expect(X, 0) GCC's extension in user space).

If a task waiting on position P faces w_tasks[P % CONSUMERS] != NULL, then it is likely that the position is acquired by a task waiting on position Q, such that Q + CONSUMERS <= P. Since we have only CONSUMERS number of consumers, then it means that position P in the queue already has an item (due to property 4). We're in a slow path anyway, so there is no problem to try to wake up the waiting task to make its wake up happen earlier. There are also a chance that Q > P, but it is less likely and there is still no problem in false wake up. Somebody can push an item to the queue during our spinning in waiting for freeing position in w_tasks, so we must check the condition at each iteration.

Finally, we perform classical double check of the condition to avoid infinite waiting and set w_tasks position to NULL at the end of waiting.

This is fast condition wait, and moreover due to reduced cache bouncing, it makes the lock-free queue ever faster than its spinning version. The kernel module which uses the lock-free queue with this condition wait algorithm has shown about 10% performance improvement in comparison with the queue without condition wait. Unfortunately, I don't have independent test code in which I can measure the performance gain for the queue itself without additional application logic.

13 comments:

  1. Can you please describe the benchmark you used?

    I believe that the benchmark should be parametric. There are two things that should be taken into account:
    1. How long does it take to produce a job (parameter PD)
    2. How long does it take to consume a job (parameter CD)

    In your benchmarks I have seen that you just make both PD and CD very low. Such a benchmark does not make sense, because a single threaded application would be a better choice to execute the task. I have tried to run your github code with various parameters and I have noticed that:
    1. When both PD and CD are low, your queue is faster than the boost queue, but single thread would do the job faster.
    Otherwise:
    2. When PD is higher than CD then the lockfree queue is a bit faster (probably because of busy waiting).
    2. When PD is lower than CD (this is the interesting case) boost queue is a bit faster.

    I would recommend everybody to do their own measurement with single thread, boost queue and lockfree queue to see which approach suits the best their needs.

    ReplyDelete
  2. Hi Klokan,

    it seems you're writing about benchmarks for the queue itself, not for condition wait which the post is about.

    I didn't get which exactly parameters in the benchmark do you mean. The benchmark in original queue has only following parameters: number of producers (PRODUCERS), number of consumers (CONSUMERS) and number of inserted elements (N). Also the queue can is parametrized by number of items.

    However, concerning single-thread version of queue, surely any concurrent data structure (including lock-free implementation) is slower that pure single-threaded version because it must care about synchronization which is usually not for free ever for single CPU.

    ReplyDelete
    Replies
    1. You are right. I was talking about the queue benchmarks. I'm sorry that I misplaced my comment.

      I was talking about the parameters that are missing in the queue benchmark, but should be added.

      In your other post you show that on a single job, you can do it in 13969ms while boost queue does it in 20240ms. That's nice until one realizes that this particular task can be done by a single threaded application in 4 seconds using less cores.

      One may assume that if you are fast on this extreme benchmark then you would be faster on a more real life one, but such assumption has no good justification. For example, in real benchmarks, the naive queue may not suffer from lock contention at all while your queue suffers from busy waiting.

      That is why everybody should try more queue types and use the one that suits them best.

      Delete
  3. Hi Klokan,

    this is not a generic lock-free queue and boost's one tries to be, rather this is work queue for many producers and many consumers. Moreover, it was written to outperform any other implementations on 10 and more cores. So, yes, this is very specific solution and it does specific work.

    Since it was designed for multicore, there wasn't sense to test it on single-core.

    ReplyDelete
    Replies
    1. To better explain what I mean, I have written this code:
      http://pastie.org/9660361

      And benchmarked it on my computer (labelled NEW):
      lock free: 6919ms
      check X data...
      Passed
      naive: 42398ms
      check X data...
      Passed
      boost: 8939ms
      check X data...
      Passed
      NEW: 46ms
      check X data...
      Passed

      It is 150x faster than lock-free queue on 4 physical core processor. I doubt that throwing in more cores would make your implementation faster.

      I agree that I cheated a bit, because in my implementation I'm not really queuing jobs. I execute them right away. But my approach has a lot of advantages:
      1. My implementation has superior latency because tasks get finished soon after they are created.
      2. It uses less cores (only 2 instead of 4)
      3. Has some guarantees of task ordering: Tasks produced in a single producer get executed in order in which they are created.
      4. It outperforms any other implementation.

      Delete
  4. Thanks for the code sample - it becomes much clear what did you mean.

    Your approach can't be compared with lock-free queues, because you don't have a queue, rather your have several producing threads and each of them pushes to _its_own_queue_. The queues are just combined into one physical array and accessed using different and unique offsets. There is no shared queue with many producers and many consumers.

    Moreover, you can make your approach much faster. Currently you write 8-byte integers, so you typically suffer from false sharing: there are 8 write positions in one cache line of modern x86, so different CPUs write to the same cache line and this makes the code slower. Thus if you completely split x array to PRODUCERS arrays, then you get much more performance.

    ReplyDelete
    Replies
    1. Please note that in my code I did not split array x to several PRODUCERS arrays. I still share array x and all my producers write to the same array so they are affected by this false sharing. I do the >exact same job< you do in your benchmark.

      What I'm trying to tell you is that these queues are supposed to be "tools to make the job done faster". What I'm showing you is that the job can be done in 46 milliseconds so the lock free queue is not helping to make the job faster. You have clearly optimized your queue to do this task faster than competitor queues. You did not consider whether using such queue for this particular problem is useful. What I'm showing you is that it's not worth using queues to solve this problem. What I'm also telling you is that if you took a job that is worth to parallelize with such a queue then you might not be faster than boost.

      Delete
    2. To clarify: I did not split x on purpose. I did it so that I have basically the same producer code and the single difference is that I avoid to use queue and I instantly consume the job. I did it to show that using queue here is a bad idea.

      Delete
  5. This comment has been removed by the author.

    ReplyDelete
  6. I just can rephrase my answer and give you some more details: your approach is faster just because you have not concurrency control at all. You just write queue items to independent positions in the array.

    Moreover, you (1) don't have overflow/underflow queue control and (2) your ever don't have consumers. If you try to get something useful from your testing benchmark, then probably you find that your approach is absolutely equal to writing to independent arrays.

    ReplyDelete
    Replies
    1. I have two questions:

      1. If somebody has a real life problem that has this property:

      ""It takes faster to consume the job than to queue it into a queue""

      Then he should use my approach instead of queue. That developer should consume the job in producer instead of queuing it. Am I right?

      2. If yes, then you benchmark the queue on a problem that should be solved without the queue. Am I right?

      Delete
  7. Klokan,

    it's hard to answer the questions. Yes, surely, sometimes there are tasks which should be solved without queue, probably with direct processing in producing context or ever using completely different data structures like trees. However, there are tasks which must be solved using a queue. It seems for me that you're trying to compare apples with oranges.

    ReplyDelete
    Replies
    1. Let me use a parallel example here, because I really don't know how else to explain myself. Let's say we have a list of integers and we want to sort it. If the number of integers is small(1-10) the best algorithm is Insert sort, but there is quicksort for bigger arrays.

      So in this parallel example, you have made a very fast implementation of insert sort and then you took an array of 1000000 elements and you have shown that you are faster than other implementation of insert sort. In this parallel example I have shown you that using different algorithm I can sort this array 150 times faster than you and you are telling me that I'm comparing apples to oranges because I did not use insert sort... What I'm telling you is that I'm 150 times faster with the >different< algorithm on the >same< job. So even though the algorithms are apples and oranges the tasks are oranges and oranges. What I'm telling you is that the task you are benchmarking is wrong because nobody should use Insert sort on such a big arrays and nobody should use job queues on such a small jobs. You should benchmark on an example that makes sense. I would like to see if you are still faster then the competition if you try "to sort your 5 element array"...

      Delete