C++11 Thread 的 condition variable

| | 0 Comments| 09:34|
Categories:

之前已經有用三篇文章,介紹了 C 11 裡、STL 的 Thread 這個函式庫的使用方法了。有興趣的話,請先參考《基本使用》、《多執行序之間的溝通(一)》和《多執行序之間的溝通(二)》這三篇文章。

這一篇,則是來講一下 Thread 的 condition_variable,他主要的用途,是用來把目前的 thread 停下來、等候通知用的;基本上,應該算是適用於多執行序環境下的事件處理,也就是由某些執行序發出通知、告訴其他執行序要去繼續執行某些動作。

而在使用時,必須要搭配 unique_lock 一起使用。下面的內容,主要是參考 cppreference(連結)和 Boost(連結)所提供的內容。


基本使用

首先,要使用 condition_variable 必須要先 include condition_variable 這個 header 檔。

在使用 condition_variable 的時候,需要先透過 unique_lock 來鎖定一個 mutex,之後再呼叫 condition_variable 提供的 wait() 函式,來等候這個 condition_variable 發出的通知。

這個時候,這個執行序就會整個整個停下來、等候通知;如果要讓這個執行序繼續運作的話,則是要在別的執行序 、透過呼叫 condition_variable 提供的 notify_all()notify_one() 這兩個函式,來喚醒因為 wait() 而停下來的執行序。

下面就是一個比較簡單、不是很嚴謹的範例(註 1):

#include <thread>
#include <mutex>
#include <condition_variable>
#include <iostream>

using namespace std;

mutex gMutex;
condition_variable gCV;

void funcThread()
{
cout << "[" << this_thread::get_id() << "] Thread started." << endl;
unique_lock<mutex> mLock(gMutex);
gCV.wait( mLock );
cout << "[" << this_thread::get_id() << "] Thread end." << endl;
}

int main( int argc, char** argv )
{
cout << "[Main] create new thread" << endl;
thread t1(funcThread);

cout << "[Main] wait 1 second" << endl;
this_thread::sleep_for( chrono::seconds(1) );

cout << "[Main] send notify" << endl;
gCV.notify_all();

cout << "[Main] wait thread stop" << endl;
t1.join();
}

在這個範例裡面,有兩個全域變數,一個是名為 gMutexmutex 物件、另一個則是名為 gCVcondition_variable 物件。

主程式在執行後,會先建立一個名為 t1 的 thread、去執行 funcThread() 這個函式;而之後,則是會透過 sleep_for() 來暫停一秒、接下來再去呼叫 gCVnotify_all(),來送出喚醒的通知(註 2)。最後,則是在呼叫 t1join(),等 t1 這個執行序完全結束。

而在 funcThread() 裡,他則是在一開始,就建立一個 unique_lock 的物件 mLock,把 gMutex 鎖住;接下來,則是呼叫 gCVwait(),讓執行序停在這邊,去等候 gCV 這個 condition_variable 物件的通知。這邊可能需要附帶一提的是,在把 mLock 傳進 gCVwait()、開始等待的時候,gMutex 也會被自動解鎖;如此一來其他執行序才能使用 gMutex

所以實際執行的結果,應該會類似下面這樣:

[Main] create new thread
[27892] Thread started.
[Main] wait 1 second
[Main] send notify
[Main] wait thread stop
[27892] Thread end.

可以看到,t1 這個執行序的編號是 27892,而他在被建立出來後,就會立刻去輸出「Thread started」的訊息,但是「Thread end」的訊息則是由於前面在等待 gCV 的通知,所以沒有立刻執行;直到主執行序過了一秒後、呼叫了 notify_all()t1 才又繼續執行下去。


其他的等待條件

在上面的例子裡面,基本上就是單純地去呼叫 condition_variablewait() 函式,並把一個 unique_lock 傳遞給他。而實際上,wait() 這個函式,是還有第二個參數的~第二個參數基本上接受一個可呼叫的物件,來判斷是否要停止等待;而這個可以被呼叫的物件的需要回傳一個 bool 變數,如果是 true 的話,condition_variable 就會停止等待、繼續執行,而如果回傳 false 的話,他則會重新開始等待下一個通知。

下面就是它的使用範例:

gCV.wait( mLock, [](){ return gDone; } );

其中,gDone 是一個 bool 變數,基本上預計會是在外部修改他的值。

而上面程式的寫法,和下面的是等價的:

while( !gDone )
gCV.wait( mLock );

透過這樣的機制,也可以加上另一個條件,來判斷是要在收到通知後就繼續做下去、或是要繼續等下去;基本上這算是讓他在使用上更為彈性了~

而除了 wait() 以外,condition_variable 也還有提供 wait_for()wait_until() 這兩個函式,來限制等待的時間;也就是當時間到了之後,就算沒有收到通知,他還是會讓執行序停止等待、繼續執行。在呼叫 wait_for() 時需要額外再給一個指定長度的時間,而 wait_until() 則是要額外指定一個時間點;時間的形式類別都是要使用 STL 的 chrono(參考)所提供的類別,前者是要使用 duration、後者則是使用 time_point

下面是簡單的使用範例:

gCV.wait_for( mLock, chrono::seconds(5) );
gCV.wait_until( mLock, chrono::system_clock::now() chrono::seconds(5) );

wait_for()wait_until() 這兩個函式也和 wait() 一樣,可以加上額外的條件,來做停止等待的檢查。


不同的通知方式

在上面的例子裡面,Heresy 是用 notify_all() 來做 condition_variable 的通知;而實際上,condition_variable 除了 notify_all() 之外,也還有提供另一個函式 notify_one()、也是用來左通知之用的。而兩者的差別,在於 notify_all() 會去通知所有正在等待這個 condition_variable 的執行序,而  notify_one() 則只會通知其中一個。

下面是一個例子:

#include <thread>
#include <mutex>
#include <condition_variable>
#include <iostream>
#include <vector>

using namespace std;

mutex gMutex;
condition_variable gCV;

void funcThread( size_t idx )
{
cout << "[" << idx << "] Thread started." << endl;
unique_lock<mutex> mLock(gMutex);
gCV.wait_for( mLock, chrono::seconds(3) );
cout << "[" << idx << "] Thread end." << endl;
}

int main( int argc, char** argv )
{
vector<thread> vThreads;
vThreads.resize(3);

cout << "[Main] create new thread" << endl;
for( size_t i = 0; i < vThreads.size(); i )
vThreads[i] = thread(funcThread, i);

cout << "[Main] wait 1 second" << endl;
this_thread::sleep_for( chrono::seconds(1) );

cout << "[Main] send notify" << endl;
gCV.notify_all();

cout << "[Main] wait 1 second" << endl;
this_thread::sleep_for( chrono::seconds(1) );

cout << "[Main] wait thread stop" << endl;
for( size_t i = 0; i < vThreads.size(); i )
vThreads[i].join();
}

在上面的例子裡面,會建立出三個執行序出來,然後在執行後都會去等同一個 condition_variable、也就是 gCV;之後在主執行序內,則是在等候一秒後、透過 notify_all() 來通知所有等待中的執行序繼續執行。

而這樣的程式執行結果會像下面這樣:

[Main] create new thread
[0] Thread started.
[1] Thread started.
[2] Thread started.
[Main] wait 1 second
[Main] send notify
[Main] wait 1 second
[1] Thread end.
[0] Thread end.
[2] Thread end.
[Main] wait thread stop

這邊可以看到,三個執行序基本上都是在收到通知後,幾乎同時結束的。

不過,如果把上面的 notify_all() 改成 notify_one() 的話,那結果就會變成類似下面的樣子:

[Main] create new thread
[0] Thread started.
[1] Thread started.
[2] Thread started.
[Main] wait 1 second
[Main] send notify
[Main] wait 1 second
[2] Thread end.
[Main] wait thread stop
[0] Thread end.
[1] Thread end.

可以看到,在使用 notify_one() 做通知的情況下,只有 thread 2 是因為收到通知而結束的,剩下的 thread 0 和 thread 1,則是由於是使用 wait_for() 所設定的時限(三秒)到了,才結束的~
(如果不用 wait_for() 的話,會因為剩下的兩個執行序沒有人喚醒、而讓程式無法結束)

而如果不想用 wait_for() 來讓他因為時間到了而結束的話、也可以把 funcTread() 的內容稍作修改,

void funcThread( size_t idx )
{
cout << "[" << idx << "] Thread started." << endl;
unique_lock<mutex> mLock(gMutex);
gCV.wait( mLock );

this_thread::sleep_for( chrono::seconds(1) );
cout << "[" << idx << "] Thread end, notify next." << endl;
gCV.notify_one();
}

在上面的程式寫法,在執行序被喚醒之後,會等個一秒、然後輸出結束訊息後,送出一個通知,來喚醒另一個在等待中的執行序。所以執行結果會變成:

[Main] create new thread
[0] Thread started.
[1] Thread started.
[2] Thread started.
[Main] wait 1 second
[Main] send notify
[Main] wait 1 second
[2] Thread end, notify next.
[Main] wait thread stop
[1] Thread end, notify next.
[0] Thread end, notify next.

這樣的寫法,會變成是由主執行序喚醒 thread 2,而在 thread 2 結束前再去喚醒 thread 1、然後在 thread 1 結束前去喚醒 thread 0;這樣的寫法也可以確保所有在等待的執行序都會被喚醒、而且一次只有一個在執行。(註 3)


其他

接下來,則是一些 Heresy 不打算詳細說明的東西,就姑且在這邊簡單提一下了。

首先,condition_variable 在設計上,只能搭配 unique_lock<mutex> 使用,不能使用其他的 lock 或 mutex 的類型;而如果要更通用話的版本的話,則可以使用 condition_variable_any參考)。

另外,如果擔心有沒有被喚醒的執行序的話,也可以考慮使用 notify_all_at_thread_exit() 這個函式來讓執行序結束的時候、發出通知;詳細請參考 cppreference(頁面)。


附註:

  1. 這個範例寫的不是很嚴警,像是 ostream 在輸出的時候,就有可能因為兩個執行序同時要輸出、而有結果錯亂的問題,不過因為不是很重要,所以這邊就無視了。

  2. 這邊要等一秒的原因,是因為如果馬上執行,很有可能因為 t1 還沒呼叫 gCV.wait() 的關係,而提早送出通知;這會導致之後 t1 在等待通知的時候,等不到新的通知而無法結束。

  3. 雖然在 MSVC2012 的執行結果看來是會照順序喚醒,不過在 C 的標準裡,當有多個執行序都在等待同一個 condition_variable 的通知的時候,notify_one() 會去通知其中一個執行序,但是應該沒有規範通知的順序。

Leave a Reply

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *