這篇來講一下 C++20 針對多執行序/併行(Concurrency)新增的另一個功能:semaphore(信號、C++ Reference)。
他的基本型別是 std::counting_semaphore
,在內部擁有一個計數器來記錄總共可用的數量;這邊或許把他想像成總共有好幾張椅子,然後讓大家都要來坐的概念會比較好理解。
使用時基本上就是透過呼叫 acquire()
來向 counting_semaphore
要求一個位置、這時候它的內部記數器就會減 1、代表有一個位置被用掉了;而在用完之後,則是要透過呼叫 release()
這個函式來釋出位置、這時候內部計數器就會加 1。
而如果在呼叫 acquire()
的時候內部的計數器是 0 的話就代表已經沒有位置了,這時候就必須在那邊等到有位置被釋出為止。
所以感覺起來,他主要的功能應該是用來控制「同時可以使用的數量」的東西。
基本使用範例
比如說這邊可以先建立 10 個執行序來執行工作、但是又另外定義一個大小是 3 的 std::counting_semaphore
,這樣就可以限制某個階段的動作同一時間只有 3 個執行序可以執行。
下面就是一個這樣的例子:
#include <chrono> #include <iostream> #include <vector> #include <thread> #include <semaphore> #include <syncstream> using namespace std::chrono_literals; constexpr int numThread = 10; std::counting_semaphore semaphore(3); void worker( size_t idx) { semaphore.acquire(); std::osyncstream(std::cout) << "[" << idx << "] start" << std::endl; // do job! std::this_thread::sleep_for(0.1s); std::osyncstream(std::cout) << "[" << idx << "] finished" << std::endl; semaphore.release(); } int main() { std::vector<std::jthread> vThreadPool; for (int i = 0; i < numThread; ++i) vThreadPool.emplace_back(worker, i); }
這邊是定義了一個 std::counting_semaphore
的變數 semaphore
、並指定他的初始數值是 3。
之後則是建立了 10 個執行序,裡面會透過 acquire()
來向 semaphore
送出要求;而由於 semaphore
的大小是 3,所以最多同時就只能有 3 的執行序可以直接通過這個函式、其他的執行序都會因為 semaphore
內的計數器已經是 0 而必須在這邊等待到別人釋出。
再來,則是等個 0.1 秒後,就透過 release()
來告訴 semaphore
自己該做的事已經做完了、可以釋出之前占用的位置了。而此時,其他執行序就有機會可以繼續做事了。
視覺化版本的範例
不過上面的例子的輸出可能比較不容易看出來「同時只有三個執行序在做事」的狀態,所以後來用改了一個比較複雜、但是視覺化的版本:
#include <chrono> #include <iostream> #include <array> #include <vector> #include <thread> #include <atomic> #include <semaphore> using namespace std::chrono_literals; constexpr int numThread = 10; std::counting_semaphore semaphore(3); std::atomic_int numJobs = numThread; // used to counting unfinished thread std::array<std::atomic_char, numThread> aThreadStatus; // status of each thread void worker(size_t idx) { aThreadStatus[idx] = 0; semaphore.acquire(); { aThreadStatus[idx] = 1; // do job! std::this_thread::sleep_for(0.1s); } semaphore.release(); aThreadStatus[idx] = 2; --numJobs; } int main() { std::vector<std::jthread> vThreadPool; for (int i = 0; i < numThread; ++i) vThreadPool.emplace_back(worker, i); // output thread index for (int i = 0; i < numThread; ++i) std::cout << i << " "; std::cout << "\n"; // monitor thread status while (numJobs > 0) { for (const auto& status : aThreadStatus) { switch (status) { case 0: std::cout << "- "; break; case 1: std::cout << "| "; break; case 2: std::cout << " "; break; } } std::cout << "\n"; std::this_thread::sleep_for(0.05s); } }
這邊是會透過 aThreadStatus
來記錄每個執行序的狀態(因為 std::atmoic<>
不能放進 std::vector<>
、所以這邊用 std::array<>
)、並透過 numJobs
來記錄還沒在進行的執行序數量,作為主執行序結束的判斷依據。
在主執行序則是會把 aThreadStatus
畫出來,結果應該會類似下面的狀況:
0 1 2 3 4 5 6 7 8 9 | | | - - - - - - - | | | - - - - - - - | | - - | - - | | - - | - - | | | - | | | - | |
可以看到,一開始只有 0、1、2 三個執行序在執行工作(狀態值 1),之後則是 3、4、7 這樣子。
有興趣的話也可以自己修改數值看看。
semaphore 的最大值
上面雖然在定義 semaphore
的時候都是直接寫:
std::counting_semaphore semaphore(3);
但是這邊需要提醒一下,這邊建構的引數 3 代表的是 semaphore
初始化完成時,內部計數器的數值,而不是最大值。也就是說,事後透過 release()
是可以持續增加數值的!
例如:
std::counting_semaphore semaphore(3); semaphore.acquire(); // 2 semaphore.release(); // 3 semaphore.release(); // 4
這邊後面因為 release()
多呼叫了一次,所以 semaphore
最後會變成是 4。
那要怎麼設定最大值呢?實際上 std::counting_semaphore
是一個 template 型別,透過 template 的引數、可以指定最大值。
例如想要限制他最大是 3 的話,就可以定義成:
std::counting_semaphore<3> semaphore(3);
不過…這邊的「最大值」感覺上似乎還是有點微妙?在 C++ Reference 上這個 template 引數的名稱是「LeastMaxValue
」,在說明中也寫了很謎的「LeastMaxValue is the minimum max value, not the actual max value」?感覺上…很微妙啊。
而雖然感覺上這個最大值應該應該會讓他不會因為 release()
太多次而超出述職的上限;但是實際上,以下面的例子來說:
std::counting_semaphore<3> semaphore(3); semaphore.release(); // Should error
他在 Visual Studio 2022 裡面、呼叫 release()
是會出錯的!(但是也沒辦法透過 try-catch 來攔截)
但是同樣的程式,在 g++ 12.3 卻似乎可以繼續執行,感覺設定的上限根本沒用…
所以…恩,使用的時候還是自己多注意吧?
binary_semaphore
除了標準型的 std::counting_semaphore
外,標準函式庫還有提供一個特化版的 std::binary_semaphore
,他基本上就是 std::counting_semaphore<1>
。
由於他理論上只有 0 / 1 兩種狀態,所以使用上變得相對簡單,在某種意義上使用也變得更直覺;像是在某些情境下,就可以用來取代 mutex lock 來限制僅有單一執行序在執行。
下面的程式、是開啟 10 個執行序來輸出各自的 thread id:
#include <iostream> #include <vector> #include <thread> int main() { std::vector<std::jthread> vPool; for (int i = 0; i < 10; ++i) { vPool.emplace_back( []() { std::cout << "The thread " << std::this_thread::get_id() << " is" << " running" << "\n"; } ); } }
但是這邊由於是直接使用 std::cout
、所以每個 operator<<
可能會被打斷、最後輸出的結果很可能會很混亂、不是預期的狀況。
而除了可以直接用 mutex lock 之外,也可以透過 std::binary_semaphore
來確保每次只有一個執行序在輸出;下面就是寫法:
#include <iostream> #include <vector> #include <thread> #include <semaphore> std::binary_semaphore lock(1); int main() { std::vector<std::jthread> vPool; for (int i = 0; i < 10; ++i) { vPool.emplace_back( []() { lock.acquire(); std::cout << "The thread " << std::this_thread::get_id() << " is" << " running" << "\n"; lock.release(); } ); } }
這樣的寫法其實就是接近使用 mutex lock、確保同時只有一個執行序在輸出、讓輸出的結果不會被打亂的。(不過以這個情境來說,建議直接使用 std::osyncstream
應該比較快)
不過雖然可以做到類似的事情,但是實際上 std::binary_semaphore
的本質還是和 mutex 不同的、semaphore 不會被綁在執行序上,可以在第一個執行序 acquire()
、然後在另一個執行序 release()
。
而他也可以用來作為多執行序間的通知,在某種程度上取代 std::condition_variable
;像是下面就是一個修改之前使用 std::condition_variable
寫的例子:
#include <thread> #include <semaphore> #include <iostream> using namespace std; binary_semaphore notifier{ 0 }; void funcThread() { cout << "[" << this_thread::get_id() << "] Thread started." << endl; notifier.acquire(); cout << "[" << this_thread::get_id() << "] Thread end." << endl; } int main(int argc, char** argv) { cout << "[Main] create new thread" << endl; thread t1(funcThread); cout << "[Main] wait 1 second" << endl; this_thread::sleep_for(chrono::seconds(1)); cout << "[Main] send notify" << endl; notifier.release(); cout << "[Main] wait thread stop" << endl; t1.join(); }
感覺在使用上其實還算滿方便的?只是在程式碼的語意上感覺就沒那麼明確就是了。
其他函式
除了上面基本的 acquire()
和 release()
外,其實還有一些其他更進一步用的用法。
以 release()
來說,實際上他可以透過引數來指定要釋出的數量,例如可以呼叫 release(5)
這樣子。(但是 acquire()
不能指定數量?)
而 acquire()
除了原始的會停在那邊等待的版本外,則是還有提供 try_acquire()
這個不會等待的版本、以及 try_acquire_for()
、try_acquire_until()
這兩個版本可以指定要等多久的版本。
而透過回傳值,則可知道是否有成功取得;透過這種 non-block 的函式,就可以讓沒有成功佔到位置的執行序可以去做別的工作了。下面是一個簡單的使用範例:
if (semaphore.try_acquire()) { // Do job 1 semaphore.release(); } else { // Do job 2 }
透過這樣的機制,應該也可以實作出讓多個執行序優先做某些事情(job 1)、但是如果已經有限制數量內的執行序已經在做這件事的話,那多餘的執行序就先去做別的事(job 2)這樣的架構。
不過這樣的程式寫出來還滿雜的(job queue 要寫成 thread-safe 就很麻煩了),所以就不貼程式碼了。
這部分大概就這樣了?老實說,現階段沒有想到什麼時候真的需要使用這東西?或許得等到之後真的有對應需求才會知道了吧…