本文概述
C ++开发人员致力于构建健壮的多线程Qt应用程序, 但是对于所有这些竞争条件, 同步以及死锁和活动锁, 多线程绝非易事。值得称赞的是, 你不会放弃并发现自己正在搜索StackOverflow。然而, 从十几个不同的答案中选择正确且可行的解决方案并非易事, 特别是考虑到每种解决方案都有其自身的缺点。
多线程是一种广泛的编程和执行模型, 它允许在一个进程的上下文中存在多个线程。这些线程共享进程的资源, 但能够独立执行。线程编程模型为开发人员提供了并行执行的有用抽象。多线程也可以应用于一个进程, 以在多处理系统上并行执行。–维基百科
本文的目的是汇总有关使用Qt框架进行并发编程的基本知识, 特别是最容易理解的主题。希望读者具有Qt和C ++的背景知识, 以理解其内容。
在使用QThreadPool和QThread之间进行选择
Qt框架提供了许多用于多线程的工具。首先, 选择正确的工具可能会面临挑战, 但是实际上, 决策树仅包含两个选项:你要么希望Qt自己管理线程, 要么想要自己管理线程。但是, 还有其他重要条件:
-
不需要事件循环的任务。具体来说, 任务执行过程中未使用信号/插槽机制的任务。
使用:QtConcurrent和QThreadPool + QRunnable。 -
使用信号/插槽的任务, 因此需要事件循环。
使用:辅助对象移至+ QThread。
Qt框架的巨大灵活性使你可以解决”缺少事件循环”问题, 并为QRunnable添加一个问题:
class MyTask : public QObject, public QRunnable
{
Q_OBJECT
public:
void MyTask::run() {
_loop.exec();
}
public slots:
// you need a signal connected to this slot to exit the loop, // otherwise the thread running the loop would remain blocked...
void finishTask() {
_loop.exit();
}
private:
QEventLoop _loop;
}
但是, 请尝试避免此类”变通办法”, 因为它们很危险且效率不高:如果线程池(运行MyTask)中的一个线程由于等待信号而被阻塞, 则它无法执行该池中的其他任务。
你也可以通过重写QThread :: run()方法来运行没有任何事件循环的QThread, 只要你知道自己在做什么, 这就很好。例如, 不要期望方法quit()在这种情况下起作用。
一次运行一个任务实例
想象一下, 你需要确保一次只能执行一个任务实例, 并且所有运行同一任务的待处理请求都在特定队列中等待。当任务正在访问独占资源(例如, 写入同一文件或使用TCP套接字发送数据包)时, 通常需要这样做。
让我们暂时忘记计算机科学和生产者-消费者模式, 并考虑一些琐碎的事情;在实际项目中可以轻松找到的东西。
对于此问题的幼稚解决方案可能是使用QMutex。在任务功能内部, 你可以简单地获取互斥锁, 从而有效地序列化尝试运行该任务的所有线程。这样可以保证一次只能运行一个线程。但是, 此解决方案通过引入高竞争问题来影响性能, 因为所有这些线程在继续进行之前都会被阻塞(在互斥锁上)。如果你有许多线程正在积极地使用此类任务并在其间做一些有用的工作, 则所有这些线程大多数时候都将处于睡眠状态。
void logEvent(const QString & event) {
static QMutex lock;
QMutexLocker locker(& lock); // high contention!
logStream << event; // exclusive resource
}
为了避免争用, 我们需要一个队列和一个驻留在其自己的线程中并处理该队列的工作程序。这几乎是经典的生产者-消费者模式。工作人员(消费者)将一个接一个地从队列中选择请求, 每个生产者可以简单地将其请求添加到队列中。乍一看听起来很简单, 你可能会考虑使用QQueue和QWaitCondition, 但请继续看一下, 如果没有这些原语, 是否可以实现目标:
- 我们可以使用QThreadPool, 因为它有待处理的任务队列
Or
- 我们可以使用默认的QThread :: run(), 因为它具有QEventLoop
第一种选择是使用QThreadPool。我们可以创建一个QThreadPool实例并使用QThreadPool :: setMaxThreadCount(1)。然后, 我们可以使用QtConcurrent :: run()调度请求:
class Logger: public QObject
{
public:
explicit Logger(QObject *parent = nullptr) : QObject(parent) {
threadPool.setMaxThreadCount(1);
}
void logEvent(const QString &event) {
QtConcurrent::run(&threadPool, [this, event]{
logEventCore(event);
});
}
private:
void logEventCore(const QString &event) {
logStream << event;
}
QThreadPool threadPool;
};
该解决方案有一个好处:QThreadPool :: clear()允许你立即取消所有挂起的请求, 例如在你的应用程序需要快速关闭时。但是, 与线程关联性也存在一个明显的缺点:logEventCore函数可能会在调用之间的不同线程中执行。我们知道Qt有一些需要线程亲和性的类:QTimer, QTcpSocket以及其他一些类。
Qt规范关于线程亲和力的说法:计时器在一个线程中启动, 不能从另一个线程中停止。而且只有拥有套接字实例的线程才能使用此套接字。这意味着你必须在启动计时器的线程中停止所有正在运行的计时器, 并且必须在拥有套接字的线程中调用QTcpSocket :: close()。这两个示例通常在析构函数中执行。
更好的解决方案依赖于使用QThread提供的QEventLoop。这个想法很简单:我们使用信号/插槽机制来发出请求, 并且线程内运行的事件循环将作为一个队列, 一次只允许执行一个插槽。
// the worker that will be moved to a thread
class LogWorker: public QObject
{
Q_OBJECT
public:
explicit LogWorker(QObject *parent = nullptr);
public slots:
// this slot will be executed by event loop (one call at a time)
void logEvent(const QString &event);
};
LogWorker构造函数和logEvent的实现非常简单, 因此此处未提供。现在, 我们需要一个用于管理线程和工作程序实例的服务:
// interface
class LogService : public QObject
{
Q_OBJECT
public:
explicit LogService(QObject *parent = nullptr);
~LogService();
signals:
// to use the service, just call this signal to send a request:
// logService->logEvent("event");
void logEvent(const QString &event);
private:
QThread *thread;
LogWorker *worker;
};
// implementation
LogService::LogService(QObject *parent) : QObject(parent) {
thread = new QThread(this);
worker = new LogWorker;
worker->moveToThread(thread);
connect(this, &LogService::logEvent, worker, &LogWorker::logEvent);
connect(thread, &QThread::finished, worker, &QObject::deleteLater);
thread->start();
}
LogService::~LogService() {
thread->quit();
thread->wait();
}
让我们讨论一下这段代码的工作方式:
- 在构造函数中, 我们创建一个线程和工作程序实例。注意, worker没有接收到父对象, 因为它将被移到新线程中。因此, Qt将无法自动释放工作者的内存, 因此, 我们需要通过将QThread :: finished信号连接到deleteLater插槽来执行此操作。我们还将代理方法LogService :: logEvent()连接到LogWorker :: logEvent(), 由于线程不同, 该方法将使用Qt :: QueuedConnection模式。
- 在析构函数中, 我们将quit事件放入事件循环的队列中。在处理所有其他事件之后, 将处理此事件。例如, 如果在析构函数调用之前进行了数百次logEvent()调用, 则记录器将在提取quit事件之前处理所有这些事件。当然, 这需要时间, 因此我们必须等待()直到事件循环退出。值得一提的是, 退出事件之后发布的所有将来的日志记录请求将永远不会被处理。
- 日志记录本身(LogWorker :: logEvent)将始终在同一线程中完成, 因此, 这种方法对于需要线程亲和性的类非常有效。同时, LogWorker构造函数和析构函数在主线程(特别是在其中运行LogService的线程)中执行, 因此, 你需要非常注意在其中运行的代码。具体来说, 除非你可以在同一线程中运行析构函数, 否则请勿停止计时器或在辅助析构函数中使用套接字!
在同一线程中执行工作者的析构函数
如果你的工作程序正在处理计时器或套接字, 则需要确保在同一线程(为工作程序创建的线程以及将工作程序移至的线程)中执行析构函数。支持此功能的最明显方法是在QThread的子类中删除QThread :: run()方法中的worker。考虑以下模板:
template <typename TWorker>
class Thread : QThread
{
public:
explicit Thread(TWorker *worker, QObject *parent = nullptr)
: QThread(parent), _worker(worker) {
_worker->moveToThread(this);
start();
}
~Thread() {
quit();
wait();
}
TWorker worker() const {
return _worker;
}
protected:
void run() override {
QThread::run();
delete _worker;
}
private:
TWorker *_worker;
};
使用此模板, 我们从上一个示例重新定义LogService:
// interface
class LogService : public Thread<LogWorker>
{
Q_OBJECT
public:
explicit LogService(QObject *parent = nullptr);
signals:
void **logEvent**(const QString &event);
};
// implementation
LogService::**LogService**(QObject *parent)
: Thread<LogWorker>(new LogWorker, parent) {
connect(this, &LogService::logEvent, worker(), &LogWorker::logEvent);
}
让我们讨论一下这应该如何工作:
- 我们使LogService成为QThread对象, 因为我们需要实现自定义run()函数。我们使用私有子类来防止访问QThread的函数, 因为我们想在内部控制线程的生命周期。
- 在Thread :: run()函数中, 我们通过调用默认的QThread :: run()实现来运行事件循环, 并在事件循环退出后立即销毁worker实例。请注意, 工作程序的析构函数在同一线程中执行。
- LogService :: logEvent()是代理函数(信号), 它将日志记录事件发布到线程的事件队列中。
暂停和恢复线程
另一个有趣的机会是能够暂停和恢复我们的自定义线程。想象一下, 你的应用程序正在执行某些处理, 当该应用程序被最小化, 锁定或刚刚失去网络连接时, 该处理需要暂停。这可以通过构建自定义异步队列来实现, 该队列将保留所有待处理的请求, 直到恢复工作线程为止。但是, 由于我们正在寻找最简单的解决方案, 因此我们将(再次)将事件循环的队列用于相同的目的。
要挂起线程, 我们显然需要它在特定的等待条件下等待。如果以这种方式阻塞线程, 则其事件循环将不处理任何事件, 并且Qt必须将其保留在队列中。恢复后, 事件循环将处理所有累积的请求。对于等待条件, 我们仅使用QWaitCondition对象, 该对象也需要QMutex。为了设计可被任何工作人员重用的通用解决方案, 我们需要将所有挂起/恢复逻辑放入可重用的基类中。我们称之为SuspendableWorker。这样的类应支持两种方法:
- suspend()是一个阻塞调用, 它将线程设置为等待状态。这可以通过将挂起请求发布到队列中并等待直到它被处理来完成。与QThread :: quit()+ wait()非常相似。
- resume()会发出等待信号, 以唤醒睡眠线程以继续执行。
让我们回顾一下界面和实现:
// interface
class SuspendableWorker : public QObject
{
Q_OBJECT
public:
explicit SuspendableWorker(QObject *parent = nullptr);
~SuspendableWorker();
// resume() must be called from the outer thread.
void resume();
// suspend() must be called from the outer thread.
// the function would block the caller's thread until
// the worker thread is suspended.
void suspend();
private slots:
void suspendImpl();
private:
QMutex _waitMutex;
QWaitCondition _waitCondition;
};
// implementation
SuspendableWorker::SuspendableWorker(QObject *parent) : QObject(parent) {
_waitMutex.lock();
}
SuspendableWorker::~SuspendableWorker() {
_waitCondition.wakeAll();
_waitMutex.unlock();
}
void SuspendableWorker::resume() {
_waitCondition.wakeAll();
}
void SuspendableWorker::suspend() {
QMetaObject::invokeMethod(this, &SuspendableWorker::suspendImpl);
// acquiring mutex to block the calling thread
_waitMutex.lock();
_waitMutex.unlock();
}
void SuspendableWorker::suspendImpl() {
_waitCondition.wait(&_waitMutex);
}
请记住, 被挂起的线程永远不会收到退出事件。因此, 除非在发布退出之前恢复线程, 否则我们无法在香草QThread中安全地使用它。让我们将其集成到我们的自定义Thread <T>模板中以使其防弹。
template <typename TWorker>
class Thread : QThread
{
public:
explicit Thread(TWorker *worker, QObject *parent = nullptr)
: QThread(parent), _worker(worker) {
_worker->moveToThread(this);
start();
}
~Thread() {
resume();
quit();
wait();
}
void suspend() {
auto worker = qobject_cast<SuspendableWorker*>(_worker);
if (worker != nullptr) {
worker->suspend();
}
}
void resume() {
auto worker = qobject_cast<SuspendableWorker*>(_worker);
if (worker != nullptr) {
worker->resume();
}
}
TWorker worker() const {
return _worker;
}
protected:
void run() override {
QThread::*run*();
delete _worker;
}
private:
TWorker *_worker;
};
通过这些更改, 我们将在发布quit事件之前恢复线程。同样, Thread <TWorker>仍然允许传递任何类型的工作程序, 无论它是否为SuspendableWorker。
用法如下:
LogService logService;
logService.logEvent("processed event");
logService.suspend();
logService.logEvent("queued event");
logService.resume();
// "queued event" is now processed.
挥发性与原子性
这是一个经常被误解的话题。大多数人认为, 易失性变量可用于服务由多个线程访问的某些标志, 并且这种标志可避免数据争用情况。这是错误的, 必须为此使用QAtomic *类(或std :: atomic)。
让我们考虑一个现实的示例:一个在专用线程中工作的TcpConnection连接类, 我们希望该类导出线程安全的方法:bool isConnected()。在内部, 该类将侦听套接字事件:连接和断开连接以维护内部布尔标志:
// pseudo-code, won't compile
class TcpConnection : QObject
{
Q_OBJECT
public:
// this is not thread-safe!
bool isConnected() const {
return _connected;
}
private slots:
void handleSocketConnected() {
_connected = true;
}
void handleSocketDisconnected() {
_connected = false;
}
private:
bool _connected;
}
使_connected成员易失性不会解决问题, 也不会使isConnected()成为线程安全的。该解决方案将在99%的时间内正常工作, 但剩余的1%将使你的生活陷入噩梦。为了解决这个问题, 我们需要保护来自多个线程的变量访问。为此, 请使用QReadWriteLocker:
// pseudo-code, won't compile
class TcpConnection : QObject
{
Q_OBJECT
public:
bool isConnected() const {
QReadLocker locker(&_lock);
return _connected;
}
private slots:
void handleSocketConnected() {
QWriteLocker locker(&_lock);
_connected = true;
}
void handleSocketDisconnected() {
QWriteLocker locker(&_lock);
_connected = false;
}
private:
QReadWriteLocker _lock;
bool _connected;
}
这可以可靠地运行, 但不如使用”无锁”原子操作快。第三种解决方案既快速又线程安全(示例使用std :: atomic而不是QAtomicInt, 但在语义上这些是相同的):
// pseudo-code, won't compile
class TcpConnection : QObject
{
Q_OBJECT
public:
bool isConnected() const {
return _connected;
}
private slots:
void handleSocketConnected() {
_connected = true;
}
void handleSocketDisconnected() {
_connected = false;
}
private:
std::atomic<bool> _connected;
}
总结
在本文中, 我们讨论了有关使用Qt框架进行并发编程的几个重要问题, 并设计了解决特定用例的解决方案。我们没有考虑许多简单的主题, 例如原子原语的使用, 读写锁以及许多其他主题, 但是如果你对这些主题感兴趣, 请在下面留下你的评论, 并要求提供这样的教程。
如果你有兴趣探索Qmake, 我最近还发布了《 Qmake重要指南》。读起来很棒!
评论前必须登录!
注册