| by YoungTimes | No comments

C++多线程编程-设计并发队列

1、最基础的并发队列

并发队列最直观的实现就是使用互斥锁管理数据同步,向队列中添加数据或者删除数据时,使用互斥锁保证访问的安全性。

如下代码所示,我们基于STL中的链表(List)实现并发队列,队列类的构造函数和析构函数负责创建和销毁互斥锁。

#include <pthread.h>
#include <list> 

namespace concurrent { 
template <typename T>
class Queue { 
public: 
   Queue( ) { 
       pthread_mutex_init(&_lock, NULL); 
    } 
    ~Queue( ) { 
       pthread_mutex_destroy(&_lock);
    } 
    void push(const T& data);
    T pop( ); 
private: 
    list<T> _list; 
    pthread_mutex_t _lock;
}
};

在并发队列中插入数据:

void Queue<T>::push(const T& value ) { 
       pthread_mutex_lock(&_lock);
       _list.push_back(value);
       pthread_mutex_unlock(&_lock);
}

在并发队列中删除数据:

T Queue<T>::pop( ) { 
       if (_list.empty( )) { 
           throw ”element not found”;
       }
       pthread_mutex_lock(&_lock); 
       T _temp = _list.front( );
       _list.pop_front( );
       pthread_mutex_unlock(&_lock);
       return _temp;
}

2. 读写互斥的并发队列

在实际的应用场景中,会有这样的场景:假设你有一个很长的队列,从队列中读取数据的线程远远多于写入数据的线程,或者队列读取数据的需求远远大于队列数据写入的需求,如果写入和读取数据使用相同的互斥锁,会大大降低系统的效率。

为了解决这个问题,可以使用读写锁,读锁用于队列数据读取操作,写锁用于队列写人操作。

template <typename T>
class Queue { 
public: 
   Queue( ) { 
       pthread_mutex_init(&_rlock, NULL); 
       pthread_mutex_init(&_wlock, NULL);
    } 
    ~Queue( ) { 
       pthread_mutex_destroy(&_rlock);
       pthread_mutex_destroy(&_wlock);
    } 
    void push(const T& data);
    T pop( ); 
private: 
    list<T> _list; 
    pthread_mutex_t _rlock, _wlock;
}

在并发队列读取数据:

void Queue<T>::push(const T& value ) { 
       pthread_mutex_lock(&_wlock);
       _list.push_back(value);
       pthread_mutex_unlock(&_wlock);
}

在并发队列中删除数据:

T Queue<T>::pop( ) { 
       if (_list.empty( )) { 
           throw ”element not found”;
       }
       pthread_mutex_lock(&_rlock);
       T _temp = _list.front( );
       _list.pop_front( );
       pthread_mutex_unlock(&_rlock);
       return _temp;
}

3、设计并发阻塞队列

有时我们希望在队列为空时,读线程阻塞等待,直至数据可用,该如何实现呢?一种做法是定期轮询,它可能会导致浪费大量 CPU 周期;还有一种推荐的做法是使用条件变量。

template <typename T>
class BlockingQueue { 
public: 
   BlockingQueue ( ) { 
       pthread_mutex_init(&_lock, NULL); 
       pthread_cond_init(&_cond, NULL);
    } 
    ~BlockingQueue ( ) { 
       pthread_mutex_destroy(&_lock);
       pthread_cond_destroy(&_cond);
    } 
    void push(const T& data);
    T pop();
 
private: 
    list<T> _list; 
    pthread_mutex_t _lock;
    pthread_cond_t _cond;
}

从并发队列删除数据的代码如下。当队列为空时,读线程在条件变量上阻塞自身,pthread_cond_wait还隐式地释放互斥锁

现在,考虑这个场景:有两个读线程和一个空的队列。第一个读线程锁住互斥锁,发现队列是空的,然后在_cond上阻塞自身,这会隐式地释放互斥锁。第二个读线程经历同样的过程。最后两个读线程都阻塞等待队列非空。

T BlockingQueue<T>::pop( ) { 
       pthread_mutex_lock(&_lock);
       if (_list.empty( )) { 
           pthread_cond_wait(&_cond, &_lock) ;
       }

       T _temp = _list.front( );
       _list.pop_front( );
       pthread_mutex_unlock(&_lock);

       return _temp;
}

向并发队列写入数据的过程如下。当数据写入时,如果队列为空,就使用pthread_cond_broadcast唤醒所有等待条件变量_cond 的读线程。被唤醒的读线程现在隐式地争夺互斥锁,操作系统决定哪个线程获得对互斥锁的控制权 — 通常,等待时间最长的读线程先读取数据。

void BlockingQueue <T>::push(const T& value ) { 
       pthread_mutex_lock(&_lock);
       const bool was_empty = _list.empty( );
       _list.push_back(value);
       pthread_mutex_unlock(&_lock);
       if (was_empty) 
           pthread_cond_broadcast(&_cond);
}

在并发阻塞队列设计中,可能会出现虚假的唤醒。为了解决这个问题,我们需要对pop()方法稍加修改:使用while循环解决虚假唤醒的问题。

T BlockingQueue<T>::pop( ) { 
       pthread_cond_wait(&_cond, &_lock) ;
       while(_list.empty( )) { 
           pthread_cond_wait(&_cond) ;
       }
       T _temp = _list.front( );
       _list.pop_front( );
       pthread_mutex_unlock(&_lock);
       return _temp;
}

4.设计有超时限制的并发阻塞队列

在某些场景中,无法在特定的时间段内处理的数据可以直接丢弃。比如,实时展示金融交易所股票行情的大屏,如果它n秒前收到的数据没有被展示,新的数据就已经到来了,这时旧的数据就应该被丢弃。

为满足该场景的要求,我们可以给并发队列的添加和取出数据增加超时限制,超时的操作不予执行。

template <typename T>
class TimedBlockingQueue { 
public: 
   TimedBlockingQueue ( );
    ~TimedBlockingQueue ( );
    bool push(const T& data, const int seconds);
    T pop(const int seconds); 
private: 
    list<T> _list; 
    pthread_mutex_t _lock;
    pthread_cond_t _cond;
}

push()方法在解锁之后检查时效性,如果满足时效性则加入队列,否则丢弃该数据。

bool TimedBlockingQueue <T>::push(const T& data, const int seconds) {
       struct timespec ts1, ts2;

       const bool was_empty = _list.empty( );

       clock_gettime(CLOCK_REALTIME, &ts1);

       pthread_mutex_lock(&_lock);

       clock_gettime(CLOCK_REALTIME, &ts2);

       if ((ts2.tv_sec – ts1.tv_sec) <seconds) {
           was_empty = _list.empty();
           _list.push_back(data);
       }
       pthread_mutex_unlock(&_lock);
       if (was_empty) 
           pthread_cond_broadcast(&_cond);
}

pop()的操作比Push()操作要复杂,它有两个检查。第一个检查保证了在获得互斥锁之前发生了超时,那么就放弃此次操作。第二个检查确保它等待条件变量的时间不超过指定的超时时间,如果到超时时间段结束时还没有被唤醒,读线程需要唤醒自身并释放互斥锁。

pthread_cond_timedwait函数的第三个参数是绝对时间值,到达这个时间时读线程自愿放弃等待;如果在超时之前读线程被唤醒,pthread_cond_timedwait 的返回值是 0

T TimedBlockingQueue <T>::pop(const int seconds) { 
       struct timespec ts1, ts2; 
       clock_gettime(CLOCK_REALTIME, &ts1); 
       pthread_mutex_lock(&_lock);
       clock_gettime(CLOCK_REALTIME, &ts2);
 
       // First Check 
       if ((ts1.tv_sec – ts2.tv_sec) < seconds) { 
           ts2.tv_sec += seconds; // specify wake up time
           while(_list.empty( ) && (result == 0)) { 
               result = pthread_cond_timedwait(&_cond, &_lock, &ts2) ;
           }
           if (result == 0) { // Second Check 
               T _temp = _list.front( );
              _list.pop_front( );
               pthread_mutex_unlock(&_lock);
               return _temp;
          }
      }
      pthread_mutex_unlock(&lock);
      throw “timeout happened”;
}

5.设计有大小限制的并发阻塞队列

在长度有限制的阻塞队列,如果队列满了,写线程需要等待。

template <typename T>
class BoundedBlockingQueue { 
public: 
   BoundedBlockingQueue (int size) : maxSize(size) { 
       pthread_mutex_init(&_lock, NULL); 
       pthread_cond_init(&_rcond, NULL);
       pthread_cond_init(&_wcond, NULL);
       _array.reserve(maxSize);
    } 
    ~BoundedBlockingQueue ( ) { 
       pthread_mutex_destroy(&_lock);
       pthread_cond_destroy(&_rcond);
       pthread_cond_destroy(&_wcond);
    } 
    void push(const T& data);
    T pop( ); 
private: 
    vector<T> _array; // or T* _array if you so prefer
    int maxSize;
    pthread_mutex_t _lock;
    pthread_cond_t _rcond, _wcond;
}

阻塞队列有两个条件变量,如果队列满了,写线程等待_wcond条件变量;读线程在从队列中取出数据之后需要通知所有线程;同样,如果队列是空的,读线程等待 _rcond 变量,写线程在把数据插入队列中之后向所有线程发送广播消息;如果在发送广播通知时没有线程在等待 _wcond 或 _rcond,系统会忽略这些消息。

void BoundedBlockingQueue <T>::push(const T& value ) { 
       pthread_mutex_lock(&_lock);
       const bool was_empty = _array.empty( );
       while (_array.size( ) == maxSize) { 
           pthread_cond_wait(&_wcond, &_lock);
       } 
       _array.push_back(value);
      pthread_mutex_unlock(&_lock);
      if (was_empty) 
          pthread_cond_broadcast(&_rcond);
}

pop()函数读取数据的实现如下:

T BoundedBlockingQueue<T>::pop( ) { 
       pthread_mutex_lock(&_lock);
       const bool was_full = (_array.size( ) == maxSize);
       while(_array.empty( )) { 
           pthread_cond_wait(&_rcond, &_lock) ;
       }
       T _temp = _array.front( );
       _array.erase( _array.begin( ));
       pthread_mutex_unlock(&_lock);
       if (was_full)
           pthread_cond_broadcast(&_wcond);
       return _temp;
}

注意,在释放互斥锁之后调用 pthread_cond_broadcast。这是一种好做法,因为这会减少唤醒之后读线程的等待时间。

参考材料

https://www.ibm.com/developerworks/cn/aix/library/au-multithreaded_structures1/index.html?ca=drs-

发表评论