Skip to content

C++语言导学(10):并发(下)

引言

并发问题或多线程编程是一个复杂的话题,这里主要涉及线程安全数据结构。

最大线程数

auto max_thread = std::thread::hardware_concurrency();

这可以获取当前系统中可以支持的线程最大数量,但并不保证一定可以使用到这么多线程数。

过多的线程会导致系统频繁切换线程,导致效率反而降低,并不是线程数越多越好。

线程ID

auto tid = std::this_thread::get_id();

std::thread t(func);
std::thread::id tid = t.get_id();

有两种获取线程ID的方式。

等待线程结束

std::thread t(funcname);

t.join();

join()只能调用一次,调用后就不是joinable()了。

不等待线程结束

std::thread t(funcname);

t.detach(); // 放在后台运行

应为线程结束工作设定条件,否则这个守护线程(daemon thread)会一直存在,一旦放到后台就不再是joinable()了。

线程托管类

#include <iostream>
#include <thread>
using namespace std;

struct func{
  int m_i;

  //数据复制到线程内部,防止悬空引用
  func(int i): m_i(i){}

  void operator()(){
    for(unsigned j=0; j < m_i; ++j){
      cout << j << endl;
    }
  }
};

class ScopeThread{
  std::thread m_t;

public:
  explicit ScopeThread(std::thread t):
    m_t(std::move(t))//使用移动语义
  {
    if(!m_t.joinable()){
      throw std::logic_error("No thread");
    }
  }

  ~ScopeThread(){
      m_t.join();
  }

  ScopeThread(ScopeThread const&)=delete; // 防止自动生成的代码引起线程所有权混乱
  ScopeThread& operator=(ScopeThread const&)=delete;
};

int main(){

  int i=10;
  ScopeThread t{std::thread(func(i))};

  return 0;
}

这是一个用类来托管线程工作的例子,这个类使用RAII原则管理线程的生命周期。

线程安全数据结构(thread-safe)

#include <iostream>
#include <thread>
#include <queue>
#include <memory>
#include <mutex>
#include <condition_variable>

using namespace std;

template<typename T>
class threadsafe_queue{
private:
  mutable std::mutex mut;
  std::queue<T> data_queue;
  std::condition_variable data_cond;

public:
  threadsafe_queue(){}

  threadsafe_queue(threadsafe_queue const& other){
    std::lock_guard<std::mutex> lk(other.mut);
    data_queue = other.data_queue;
  }

  void push(T new_value){
    std::lock_guard<std::mutex> lk(mut);
    data_queue.push(new_value);
    data_cond.notify_one();
  }

  void wait_and_pop(T& value){
    std::unique_lock<std::mutex> lk(mut);
    data_cond.wait(lk,[this]{ return !data_queue.empty();});
    value = data_queue.front();
    data_queue.pop();
  }

  std::shared_ptr<T> wait_and_pop(){
    std::unique_lock<std::mutex> lk(mut);
    data_cond.wait(lk,[this]{ return !data_queue.empty();});
    std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
    data_queue.pop();
    return res;
  }

  bool try_pop(T& value){
    std::lock_guard<std::mutex> lk(mut);
    if(data_queue.empty()){
      return false;
    }
    value = data_queue.front();
    data_queue.pop();
    return true;
  }

  std::shared_ptr<T> try_pop(){
    std::lock_guard<std::mutex> lk(mut);
    if(data_queue.empty()){
      return std::shared_ptr<T>();
    }
    std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
    data_queue.pop();
    return res;
  }

  bool empty() const{
    std::lock_guard<std::mutex> lk(mut);
    return data_queue.empty();
  }

};

int main(){

  int value = 0;
  threadsafe_queue<int> data;

  data.push(8);
  data.push(7);
  data.push(10);
  data.push(24);

  data.wait_and_pop(value);
  cout << "1: value=" << value << endl;  // 8

  if(data.try_pop(value)){
    cout << "2: value=" << value << endl;  // 7
  }

  auto value2 = data.wait_and_pop();
  cout << "3: value2=" << *value2 << endl;  // 10

  std::shared_ptr<int> value3 = data.try_pop();
  cout << "4: value3=" << *value3 << endl;  // 24

  return 0;
}

这里通过互斥量锁和条件变量来确保线程安全。

一个简单线程池

#include <iostream>
#include <vector>
#include <thread>
#include <atomic>
#include <functional>

#include "threadsafe_queue.hpp"

using namespace std;

// 和某个vector引用绑定,利用RAII原则释放该引用
class join_threads{

  std::vector<std::thread>& m_threads;

public:
  explicit join_threads(std::vector<std::thread>& threads):
    m_threads(threads){}

  ~join_threads(){
    for(unsigned long i=0; i < m_threads.size(); ++i){
      if(m_threads[i].joinable()){
          m_threads[i].join();
      }
    }
  }
};

class thread_pool{
  std::atomic_bool m_done; // 原子操作
  threadsafe_queue<std::function<void()>> work_queue;
  std::vector<std::thread> threads;
  join_threads joiner; // 这个成员必须在最后声明,这样才是最后销毁

  void worker_thread(){
    while(!m_done){
      std::function<void()> task;
      if(work_queue.try_pop(task)){
          task();
      } else {
          std::this_thread::yield();// 释放自己的线程资源
      }
    }
  }

public:
  thread_pool(): m_done(false), joiner(threads)
  {
    unsigned const thread_count = std::thread::hardware_concurrency();
    try{
      for(unsigned i=0; i < thread_count; ++i){
          threads.push_back(std::thread(&thread_pool::worker_thread,this));
      }
    } catch(...){
      m_done = true;
      throw;
    }
  }

  ~thread_pool(){
    m_done = true;
  }

  template<typename FunctionType>
  void submit(FunctionType f){
    work_queue.push(std::function<void()>(f));
  }
};

int main(){
  thread_pool pool;

  pool.submit([=]{cout << "1" << endl; });
  pool.submit([=]{cout << "2" << endl; });

  return 0;
}

这个线程池比较简单,不会对提交的线程任务作进一步细化管理。

总结

互斥保护数据结构的方式是命令阻止真正的并发访问,变成串行化;但被互斥保护的范围越小、时间越短,需要的串行化操作就越少,并发程度就越高。

利用互斥和保护锁的数据结构是最简单的、线程安全的数据结构。