C++同步线程实现示例详解

C/C++
250
0
0
2023-06-19
目录
  • 一、同步线程
  • 二、独占访问示例

一、同步线程

虽然使用多线程可以提高应用程序的性能,但通常也会增加复杂性。如果同时执行多个函数,则必须同步对共享资源的访问。一旦应用程序达到一定大小,这将涉及大量的编程工作。本节介绍Boost.Thread提供的用于同步线程的类。

二、独占访问示例

示例 44.7。使用 boost::mutex 的独占访问

#include <boost/thread.hpp>
#include <boost/chrono.hpp>
#include <iostream>
void wait(int seconds)
{
  boost::this_thread::sleep_for(boost::chrono::seconds{seconds});
}
boost::mutex mutex;
void thread()
{
  using boost::this_thread::get_id;
  for (int i =; i < 5; ++i)
  {
    wait();
    mutex.lock();
    std::cout << "Thread " << get_id() << ": " << i << std::endl;
    mutex.unlock();
  }
}
int main()
{
  boost::thread t{thread};
  boost::thread t{thread};
  t.join();
  t.join();
}

多线程程序使用互斥体进行同步。 Boost.Thread 提供了不同的互斥类,其中 boost::mutex 是最简单的。互斥量的基本原理是防止其他线程在特定线程拥有互斥量时取得所有权。一旦释放,不同的线程就可以取得所有权。这会导致线程等待,直到拥有互斥锁的线程完成处理并释放其对互斥锁的所有权。

示例 44.7 使用了一个名为 mutex 的 boost::mutex 类型的全局互斥体。 thread() 函数通过调用 lock() 获得此对象的所有权。这是在函数写入标准输出流之前完成的。写入消息后,通过调用 unlock() 释放所有权。

main() 创建两个线程,这两个线程都在执行 thread() 函数。每个线程计数为 5,并在 for 循环的每次迭代中将消息写入标准输出流。因为 std::cout 是线程共享的全局对象,所以访问必须同步。否则,消息可能会混淆。同步保证在任何给定时间,只有一个线程可以访问 std::cout。两个线程都尝试在写入标准输出流之前获取互斥锁,但一次只有一个线程实际访问 std::cout。无论哪个线程成功调用 lock(),所有其他线程都需要等到 unlock() 被调用。

获取和释放互斥锁是一个典型的方案,Boost.Thread通过不同的类型来支持。例如,您可以使用 boost::lock_guard 而不是使用 lock() 和 unlock()。

示例 44.8。 boost::lock_guard 保证互斥释放

#include <boost/thread.hpp>
#include <boost/chrono.hpp>
#include <iostream>
void wait(int seconds)
{
  boost::this_thread::sleep_for(boost::chrono::seconds{seconds});
}
boost::mutex mutex;
void thread()
{
  using boost::this_thread::get_id;
  for (int i =; i < 5; ++i)
  {
    wait();
    boost::lock_guard<boost::mutex> lock{mutex};
    std::cout << "Thread " << get_id() << ": " << i << std::endl;
  }
}
int main()
{
  boost::thread t{thread};
  boost::thread t{thread};
  t.join();
  t.join();
}

boost::lock_guard 分别在其构造函数和析构函数中自动调用 lock() 和 unlock()。对共享资源的访问在示例 44.8 中是同步的,就像显式调用两个成员函数时一样。类 boost::lock_guard 是 RAII 习惯用法的一个示例,用于确保资源在不再需要时被释放。

除了 boost::mutex 和 boost::lock_guard,Boost.Thread 还提供了额外的类来支持同步的变体。其中一个重要的是 boost::unique_lock ,它提供了几个有用的成员函数。

示例 44.9。多功能锁boost::unique_lock

#include <boost/thread.hpp>
#include <boost/chrono.hpp>
#include <iostream>
void wait(int seconds)
{
  boost::this_thread::sleep_for(boost::chrono::seconds{seconds});
}
boost::timed_mutex mutex;
void thread()
{
  using boost::this_thread::get_id;
  for (int i =; i < 5; ++i)
  {
    wait();
    boost::unique_lock<boost::timed_mutex> lock{mutex};
    std::cout << "Thread " << get_id() << ": " << i << std::endl;
    boost::timed_mutex *m = lock.release();
    m->unlock();
  }
}
void thread()
{
  using boost::this_thread::get_id;
  for (int i =; i < 5; ++i)
  {
    wait();
    boost::unique_lock<boost::timed_mutex> lock{mutex,
      boost::try_to_lock};
    if (lock.owns_lock() || lock.try_lock_for(boost::chrono::seconds{}))
    {
      std::cout << "Thread " << get_id() << ": " << i << std::endl;
    }
  }
}
int main()
{
  boost::thread t{thread1};
  boost::thread t{thread2};
  t.join();
  t.join();
}

Example44.9

示例 44.9 使用了 thread() 函数的两个变体。两种变体仍然在循环中将五个数字写入标准输出流,但它们现在使用类 boost::unique_lock 来锁定互斥锁。

thread1() 将变量 mutex 传递给 boost::unique_lock 的构造函数,这使得 boost::unique_lock 尝试锁定互斥锁。在这种情况下,boost::unique_lock 的行为与 boost::lock_guard 没有区别。 boost::unique_lock 的构造函数在互斥量上调用 lock()。

但是,boost::unique_lock 的析构函数不会释放 thread1() 中的互斥量。在 thread1() 中,release() 在锁上被调用,这将互斥体与锁分离。默认情况下,boost::unique_lock 的析构函数会释放一个互斥量,就像 boost::lock_guard 的析构函数一样——但如果互斥量是解耦的则不会。这就是为什么在 thread1() 中显式调用 unlock()。

thread2() 将 mutex 和 boost::try_to_lock 传递给 boost::unique_lock 的构造函数。这使得 boost::unique_lock 的构造函数不是在互斥体上调用 lock(),而是调用 try_lock()。因此,构造函数只尝试锁定互斥量。如果互斥量由另一个线程拥有,则尝试失败。

owns_lock() 可让您检测 boost::unique_lock 是否能够锁定互斥体。如果 owns_lock() 返回 true,thread2() 可以立即访问 std::cout。如果 owns_lock() 返回 false,则调用 try_lock_for()。此成员函数也尝试锁定互斥锁,但它会在失败前等待互斥锁一段指定的时间。在示例 44.9 中,锁会尝试一秒钟来获取互斥量。如果 try_lock_for() 返回 true,则可以访问 std::cout。否则,thread2() 放弃并跳过一个数字。因此,示例中的第二个线程可能不会将五个数字写入标准输出流。

请注意,在示例 44.9 中,互斥量的类型是 boost::timed_mutex,而不是 boost::mutex。该示例使用 boost::timed_mutex,因为此互斥量是唯一提供成员函数 try_lock_for() 的互斥量。当对锁调用 try_lock_for() 时调用此成员函数。 boost::mutex 仅提供成员函数 lock() 和 try_lock()。

boost::unique_lock 是一个独占锁。独占锁始终是互斥量的唯一所有者。另一个锁只有在排他锁释放后才能获得互斥锁的控制权。 Boost.Thread 还支持类 boost::shared_lock 的共享锁,它与 shared_mutex 一起使用。

示例 44.10。与 boost::shared_lock 共享锁

#include <boost/thread.hpp>
#include <boost/chrono.hpp>
#include <iostream>
#include <vector>
#include <cstdlib>
#include <ctime>
void wait(int seconds)
{
  boost::this_thread::sleep_for(boost::chrono::seconds{seconds});
}
boost::shared_mutex mutex;
std::vector<int> random_numbers;
void fill()
{
  std::srand(static_cast<unsigned int>(std::time()));
  for (int i =; i < 3; ++i)
  {
    boost::unique_lock<boost::shared_mutex> lock{mutex};
    random_numbers.push_back(std::rand());
    lock.unlock();
    wait();
  }
}
void print()
{
  for (int i =; i < 3; ++i)
  {
    wait();
    boost::shared_lock<boost::shared_mutex> lock{mutex};
    std::cout << random_numbers.back() << '\n';
  }
}
int sum =;
void count()
{
  for (int i =; i < 3; ++i)
  {
    wait();
    boost::shared_lock<boost::shared_mutex> lock{mutex};
    sum += random_numbers.back();
  }
}
int main()
{
  boost::thread t{fill}, t2{print}, t3{count};
  t.join();
  t.join();
  t.join();
  std::cout << "Sum: " << sum << '\n';
}

如果线程只需要对特定资源进行只读访问,则可以使用类型为 boost::shared_lock 的非独占锁。修改资源的线程需要写访问权,因此需要独占锁。由于具有只读访问权限的线程不受同时读取同一资源的其他线程的影响,因此它可以使用非排他锁并共享互斥锁。

在示例 44.10 中,print() 和 count() 都只读取变量 random_numbers。 print() 函数将 random_numbers 中的最后一个值写入标准输出流,count() 函数将其添加到变量 sum 中。因为两个函数都不修改 random_numbers,所以它们都可以使用类型为 boost::shared_lock 的非独占锁同时访问它。

在 fill() 函数内部,需要一个类型为 boost::unique_lock 的独占锁,因为它将新的随机数插入到 random_numbers 中。 fill() 使用 unlock() 成员函数释放互斥锁,然后等待一秒钟。与前面的示例不同,wait() 在 for 循环的末尾调用,以保证在 print() 或 count() 访问容器之前至少将一个随机数放入容器中。这两个函数都在它们的 for 循环开始时调用 wait() 函数。

查看从不同位置对 wait() 函数的单独调用,一个潜在问题变得明显:函数调用的顺序直接受到 CPU 实际执行各个线程的顺序的影响。使用条件变量,可以同步各个线程,以便添加到 random_numbers 的值立即由不同的线程处理。

示例 44.11。带有 boost::condition_variable_any 的条件变量

#include <boost/thread.hpp>
#include <iostream>
#include <vector>
#include <cstdlib>
#include <ctime>
boost::mutex mutex;
boost::condition_variable_any cond;
std::vector<int> random_numbers;
void fill()
{
  std::srand(static_cast<unsigned int>(std::time()));
  for (int i =; i < 3; ++i)
  {
    boost::unique_lock<boost::mutex> lock{mutex};
    random_numbers.push_back(std::rand());
    cond.notify_all();
    cond.wait(mutex);
  }
}
void print()
{
  std::size_t next_size =;
  for (int i =; i < 3; ++i)
  {
    boost::unique_lock<boost::mutex> lock{mutex};
    while (random_numbers.size() != next_size)
      cond.wait(mutex);
    std::cout << random_numbers.back() << '\n';
    ++next_size;
    cond.notify_all();
  }
}
int main()
{
  boost::thread t{fill};
  boost::thread t{print};
  t.join();
  t.join();
}

Example44.11

示例 44.11 删除了 wait() 和 count() 函数。线程不再在每次迭代中等待一秒钟;相反,它们会尽可能快地执行。此外,不计算总数;数字只是写入标准输出流。

为了确保随机数的正确处理,各个线程使用条件变量进行同步,可以检查多个线程之间的某些条件。

和以前一样,fill() 函数在每次迭代时生成一个随机数,并将其放入 random_numbers 容器中。为了阻止其他线程同时访问容器,使用了排他锁。这个例子没有等待一秒钟,而是使用了一个条件变量。调用 notify_all() 将唤醒一直在使用 wait() 等待此通知的每个线程。

查看 print() 函数的 for 循环,您可以看到为相同的条件变量调用了成员函数 wait()。当线程被调用 notify_all() 唤醒时,它会尝试获取互斥锁,只有在 fill() 函数中成功释放互斥锁后才会成功。

这里的技巧是调用 wait() 也会释放作为参数传递的互斥体。调用 notify_all() 后,fill() 函数通过调用 wait() 释放互斥量。然后它会阻塞并等待其他线程调用 notify_all(),一旦随机数被写入标准输出流,它就会在 print() 函数中发生。

请注意,对 print() 函数内的 wait() 成员函数的调用实际上发生在单独的 while 循环中。这样做是为了处理在 print() 中首次调用 wait() 成员函数之前已经将随机数放入容器中的情况。通过将 random_numbers 中存储的元素数量与预期的元素数量进行比较,成功处理了这种情况,并将随机数写入标准输出流。

如果锁不是在 for 循环中的本地锁而是在外部作用域中实例化,则示例 44.11 也适用。事实上,这更有意义,因为不需要在每次迭代中都销毁和重新创建锁。由于互斥量总是通过 wait() 释放,因此您无需在迭代结束时销毁锁。