| by YoungTimes | No comments

C++多线程编程-设计无锁并发堆栈

今天我们来设计一个并发堆栈(Stack),这个堆栈(Stack)虽然支持多线程访问,但是没有锁。

template <typename T> 
class Stack {
private:
    typedef struct Node { 
       T data;
       Node* next;
       Node(const T& d) : data(d), next(nullptr) { } 
    } Node;
    std::atomic<Node*> top;

public: 
    Stack( ) : top(nullptr) { }
    void push(const T& data);
    T pop( ); 
};

向堆栈压入数据

template <typename T> 
void Stack<T>::push(const T& data) {
    Node* const new_node = new Node(data); 
    while(1) {
      new_node->next = top; 
      if (top.compare_exchange_weak(new_node->next, new_node)) {
          break;
      }
    }
}

在push方法里,使用了C++11的CAS方法: compare_exchange_weak。compare_exchange_weak会比较当前值和期望值,当前值与期望值相等时,修改当前值为设定值,返回true;当前值与期望值不等时,将期望值修改为当前值,返回false。

如果compare_exchange_weak返回false,证明有其它线程更新了栈顶,此时循环会再次尝试压栈,直至压栈成功。

从堆栈弹出数据

常见的错误实现一:

T Stack<T>::pop( ) { 
    if (top == NULL) {
       throw std::string(“Cannot pop from empty stack”);
    }

    while (1) { 
        Node* next = top->next;
        if (top.compare_exchange_weak(top, next)) { // CAS
            return top->data;
        }
    }
}

在多线程场景下,任何一个执行语句的执行都可能被打断。假设有两个线程同时在进行pop操作,A线程执行完top==NULL判断之后被剥夺执行权;B线程完成了pop操作,top已经等于NULL,然后A线程恢复执行,执行到top->next就会触发崩溃。

这个问题也突显了并发数据结构的基本设计原则之一:决不要假设任何代码会连续执行

正确的实现

template <typename T> 
T Stack<T>::pop() { 
    while (1) { 
        Node* result = top;
        if (result == NULL) 
           throw std::string("Cannot pop from empty stack");

        if (top && top.compare_exchange_weak(result, result->next)) {
            T ret = result->data;
            delete result;
            result = nullptr;
            return ret;
        }
    }
}

我们先记录了top的位置,如果在pop过程中,其它线程同时在push或者pop,compare_exchange_weak就会执行失败,然后pop就会进入下一轮的循环。

完整的代码如下:

#include <atomic>
#include <string>
#include <memory>
#include <iostream>
#include <thread>
#include <chrono>

#include <unistd.h>

template <typename T> 
class Stack {
private:
    typedef struct Node { 
       T data;
       Node* next;
       Node(const T& d) : data(d), next(nullptr) { } 
    } Node;
    std::atomic<Node*> top;

public: 
    Stack( ) : top(nullptr) { }
    void push(const T& data);
    T pop( ); 
};

template <typename T> 
void Stack<T>::push(const T& data) {
    Node* const new_node = new Node(data); 
    while(1) {
      new_node->next = top; 
      if (top.compare_exchange_weak(new_node->next, new_node)) {
          break;
      }
    }
}

template <typename T> 
T Stack<T>::pop() { 
    while (1) { 
        Node* result = top;
        if (result == NULL) 
           throw std::string("Cannot pop from empty stack");

        if (top && top.compare_exchange_weak(result, result->next)) {
            T ret = result->data;
            delete result;
            result = nullptr;
            return ret;
        }
    }
}

Stack<int> lock_free_stack;

void thread_1_fun() {
    for (int i = 100; i < 120; i++) {
        std::this_thread::sleep_for(std::chrono::milliseconds(10));
         std::cout << "thread 1 push " << i << std::endl;
        lock_free_stack.push(i);
    }

    for (int i = 100; i < 120; i++) {
        std::cout << "thread 1 pop " << lock_free_stack.pop() << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(10));;
    }
}

void thread_2_fun() {
    for (int i = 200; i < 220; i++) {
        std::this_thread::sleep_for(std::chrono::milliseconds(5));
        std::cout << "thread 2 push " << i << std::endl;
        lock_free_stack.push(i);
    }

    for (int i = 200; i < 220; i++) {
        std::cout << "thread 2 pop " << lock_free_stack.pop() << std::endl;
        std::this_thread::sleep_for(std::chrono::milliseconds(5));;
    }
}

int main() {
    Stack<int> lock_free_stack;

    // lock_free_stack.pop();

    lock_free_stack.push(10);
    lock_free_stack.push(11);
    lock_free_stack.push(12);

    std::cout << lock_free_stack.pop() << std::endl;
    std::cout << lock_free_stack.pop() << std::endl;
    std::cout << lock_free_stack.pop() << std::endl;

    std::thread t1(thread_1_fun);
    std::thread t2(thread_2_fun);

    t1.join();
    t2.join();

    return 0;
}

代码编译:

g++ -std=c++11 lock_free_stack.cpp -o lock_free_stack -lpthread -g

执行程序:

thread 2 push 200
thread 1 push 100
thread 2 push 201
thread 2 push 202
thread 1 push 101
thread 2 push 203
thread 2 push 204
thread 1 push 102
thread 2 push 205
thread 2 push 206
thread 1 push 103
thread 2 push 207
thread 2 push 208
thread 1 push 104
thread 2 push 209
thread 2 push 210
thread 1 push 105
thread 2 push 211
thread 2 push 212
thread 1 push 106
thread 2 push 213
thread 2 push 214
thread 1 push 107
thread 2 push 215
thread 2 push 216
thread 1 push 108
thread 2 push 217
thread 2 push 218
thread 1 push 109
thread 2 push 219
thread 2 pop 219
thread 2 pop 109
thread 1 push 110
thread 2 pop 110
thread 2 pop 218
thread 1 push 111
thread 2 pop 111
thread 2 pop 217
thread 1 push 112
thread 2 pop 112
thread 2 pop 108
thread 1 push 113
thread 2 pop 113
thread 2 pop 216
thread 1 push 114
thread 2 pop 114
thread 2 pop 215
thread 1 push 115
thread 2 pop 115
thread 2 pop 107
thread 1 push 116
thread 2 pop 116
thread 2 pop 214
thread 1 push 117
thread 2 pop 117
thread 2 pop 213
thread 1 push 118
thread 2 pop 118
thread 2 pop 106
thread 1 push 119
thread 1 pop 119
thread 1 pop 212
thread 1 pop 211
thread 1 pop 105
thread 1 pop 210
thread 1 pop 209
thread 1 pop 104
thread 1 pop 208
thread 1 pop 207
thread 1 pop 103
thread 1 pop 206
thread 1 pop 205
thread 1 pop 102
thread 1 pop 204
thread 1 pop 203
thread 1 pop 101
thread 1 pop 202
thread 1 pop 201
thread 1 pop 100
thread 1 pop 200

参考材料

  1. https://en.cppreference.com/w/cpp/atomic/atomic/compare_exchange
  2. https://www.ibm.com/developerworks/cn/aix/library/au-multithreaded_structures2/index.html

发表评论