Skip to content

并发编程

多线程

c++11以后提供了Thread头文件支持多线程编程。

1、线程创建 std::thread myThread(fun, param); fun是该线程的一个回调函数,即该线程进行的task,该task可以是一个函数也可以是一个类的成员函数。param是主线程传递给线程的参数。注意:尽量不能单独使用 ,通过 myThread.joinable() 判定是否可以join阻塞主线程和 myThread.join(); 阻塞主线程等待子线程执行完毕。当然可以通过可以通过 myThread.detch();脱离主线程,但存在隐患。

cpp
#include <iostream>
#include <thread>

void printTask(int param){
    std::cout << "Thread is running." << std::endl;
}

int main(){
    std::thread myThread(printTask, 1);

    if(myThread.joinable()){
        myThread.join();
    }
    std::cout << "waitting end." << std::endl;
    return 0;
}

子线程传递参数注意

  • 传递引用 void printTask(int& param),在主线程中传递参数应该通过 std::ref()转换为引用类型传入
  • 局部变量,注意传递局部变量会导致运行时错误需要将局部变量提升为全局变量。
  • 传递指针,在主线程中对指针的内存空间进行 free操作需要谨慎(不要在 join 前释放指针的地址空间),这和局部变量的错误是类似,代码无法找到对应变量的内存地址了。
  • 传递类对象,和指针的结果类似 delete 。可以通过智能指针解决。
  • 传递类的私有,可以通过友元进行。将线程封装成一个该类的友元函数。

多线程的资源竞争

可以通过互斥锁(进程:信号量)、原子操作解决。死锁:多个线程执行过程中存在相互持有对方需要资源的互斥锁情况。可以通过相同的资源获取顺序避免。

互斥锁

1. std::mutex
  • 描述 :最基本的互斥锁类型,用于实现线程间的互斥访问。当一个线程成功获取锁后,其他线程将被阻塞,直到该锁被释放。
  • 特点 :不可递归加锁,即同一线程不能多次获取同一把锁,否则会导致死锁。
  • 操作 :主要包括lock()、unlock()和try_lock()。
2. std::timed_mutex
  • 描述 :带超时机制的互斥锁,允许在尝试获取锁时设置超时时间。如果在指定时间内未获取到锁,则函数会返回,避免线程无限期等待。
  • 特点 :除了基本的lock()、unlock()和try_lock()外,还提供了try_lock_for()和try_lock_until()两个方法,用于在指定时间范围内尝试获取锁。
3.std::recursive_mutex
  • 描述 :递归互斥锁,允许同一线程多次获取同一把锁,而不会导致死锁。每次获取锁后,都需要相应次数的解锁操作才能完全释放锁。
  • 特点 :适用于需要多次加锁和解锁的复杂场景,如递归函数中的锁管理。
  • 操作 :与std::mutex类似,包括lock()、unlock()和try_lock()。
4. std::recursive_timed_mutex
  • 描述 :带超时机制的递归互斥锁,结合了std::timed_mutex和std::recursive_mutex的特点。
  • 特点 :既允许同一线程多次获取锁,又支持在尝试获取锁时设置超时时间。
  • 操作 :除了基本的lock()、unlock()和try_lock()外,还提供try_lock_for()和try_lock_until()两个方法。

操作锁

lock_guradunique_lock。使用 std::lock_gurad<std::mutex> lk(muxt); std::unique_lock[std::mutex](std::mutex) lk(muxt);

为了更好使用互斥锁,c++封装了 std::lock_guard 类实现自动加锁和释放锁(在构造是进行加锁,析构时进行释放锁),可以在局部代码中使用,更加优雅。c++11封装了 std::unique_lock类,在 std::lock_guard的功能基础上,增加了延迟加锁 try_lock_for()和时间锁 std::timed_mutex搭配使用、条件等更多加锁和释放锁的方案。在单例模式下使用多线程,需要注意多个线程对单例的使用。可以使用 std::call_once 确保该实例在该线程中只使用一次,但这个函数只能在线程中使用。

condition_variable条件变量

condition_variable是一个同步原语,用于阻塞一个或多个线程,直到接收到另一个线程的通知。通常与互斥锁(如 std::mutex)一起使用,以确保在条件变量上等待的线程能够安全地修改或检查共享数据。两种主要的等待操作:waitwait_for/wait_until,以及一个通知操作 notify_onenotify_allcondition_variable操作的锁时使用 unique_lock方法。

原子操作

std::atomic<T> sharedata(Value); 这个是系统级提供的操作,是原子性的,速度比加锁的时间更短。sharedata.load()获取缓存中的值,sharedata.exchange(newVale)修改缓存中的值,sharadata.store(value) 设置缓存中的值。compare_exchange_strong(value,expect)compare_exchange_weak(value,expect) 进行比较使用。定义:原子性操作是指一个操作在执行过程中不会被中断,即该操作要么全部执行成功,要么全部不执行,不会出现部分执行的情况。特点:不可分割性、完整性和并发安全。实现:软件即提供库,硬件可以通过指令支持。

异步并发

future

cpp
#include <iostream>
#include <future>

int func(){
    // 线程id是不同的
    std::cout <<  std::this_thread::get_id() <<"running ..."<< std::endl;
    return 1000;
}

int main(){
    // 获取的形式获取到future对象
    std::future<int> future_res = std::async(std::launch::async, func);

    // 可以自己定义future对象并指定线程进行运行
    // 创建一个packaged_task是一个可移动对象
    std::packaged_task<int()> task(func);
    // 设定future缓存结果的位置
    // std::future<int> task_res = task.get_future();
    auto task_res = task.get_future();
    // 指定线程运行
    std::thread myThread(std::move(task));
    myThread.join();

    std::cout << std::this_thread::get_id() << "running ..." << std::endl;
    std::cout << future_res.get() << std::endl;
    return 0;
}

Promise

promise是一个类模板,提供了不同异步之间数据交互的方式(即get和set,这是promise的标准规定的)。

用于线程间的结果传递。

cpp
#include <iostream>
#include <future>

void func(std::promise<int>& value){
    // 线程id是不同的
    std::cout <<  std::this_thread::get_id() <<"running ..."<< std::endl;
    value.set_value(10000);
}

int main(){

    std::promise<int> value;
    auto future_res = value.get_future();

    // 指定线程运行
    std::thread myThread(func, std::ref(value));
    myThread.join();

    std::cout << std::this_thread::get_id() << "running ..." << std::endl;
    std::cout << "value is " << future_res.get() << std::endl; 
    return 0;
}

async

std::async 用于启动异步任务,并返回一个 std::future。

cpp
#include <iostream>
#include <future>

int calculate_square(int x) {
    return x * x;
}

int main() {
    std::future<int> result = std::async(calculate_square, 5);
   
    std::cout << "Square: " << result.get() << std::endl;
   
    return 0;
}

线程池

线程池的本质是生成者和消费者模型。

cpp
#include <iostream>
#include <vector>
#include <queue>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <functional>
#include <atomic>

class ThreadPool{
public:
    ThreadPool(int threadNum): stop(false){
        for(int i=0; i<threadNum; i++){
            // 添加线程
            threads.emplace_back([this]{
                while(true){
                    // 加锁,等待加进入任务
                    std::unique_lock<std::mutex> lk(mtx);
                    cv.wait(lk, [this]{return !task_queue.empty()|| stop;});

                    // 线程停止 任务完成
                    if(stop && task_queue.empty()) return;

                    // 取任务
                    std::function<void()> task(std::move(task_queue.front()));
                    task_queue.pop();
                    lk.unlock();
                    task();
                }
            });
        }
    }
    // 函数模板
    // && 右值引用,在这个函数模板下是万能引用(左值 右值)
    // ... 同解构一样
    template<class F, class... Args>
    void addTask(F&& f, Args&&... args){
        // 函数和参数绑定
        // 因为通用引用,通过完美转发更好的绑定
        std::function<void()> task =
            std::bind(std::forward<F>(f), std::forward<Args>(args)...);

        // 加入任务
        {
            std::unique_lock<std::mutex> lk(mtx);
            task_queue.emplace(std::move(task));
        }
        cv.notify_one();
    }

    // 禁用拷贝构造和拷贝赋值
    ThreadPool& operator= (const ThreadPool& t) =delete;
    ThreadPool(const ThreadPool& t ) = delete;

    ~ThreadPool(){
        stop = true;
        // 让所有线程取任务,确保任务运行完毕
        cv.notify_all();
        for(auto& t: threads){
            t.join();
        }
    }
  
private:
    std::vector<std::thread> threads;               // 线程列表 
    std::queue<std::function<void()>> task_queue;     // 任务队列
    std::condition_variable cv;                     // 条件变量
    std::mutex mtx;     // 互斥锁
    std::atomic<bool> stop;  // 原子操作
};

int main(){
    ThreadPool pool(4);

    // 添加任务
    for(int i=0; i<10; i++){
        pool.addTask([i]{
            std::cout << std::this_thread::get_id() << " " << i << "running." << std::endl;
            std::this_thread::sleep_for(std::chrono::seconds(1));
            std::cout << std::this_thread::get_id() << " " << i << "endding." << std::endl;
        });
    }
 
    return 0;
}

文章更新时间: