C++20 在多執行序程式開發的部分,除了提供了新的 std::jthread
外,其實還有提供一些新的多執行序同步功能可以使用,像是之前介紹過的 std::counting_semaphore
就是其中之一。而接下來這篇則是來講一下多執行序之間同步用的 std::barrier
和 std::latch
。
其中,std::barrier
(文件、可以翻譯成柵欄、屏障)主要應該是用來給一群固定大小的執行序用的同步點;例如要確保所有的執行序都跑完第一階段的工作後,再一起開始執行第二階段的工作這樣的概念。
而 std::latch
(文件,可以翻譯成門閂)則比較像是設計來給不同類型的執行序/工作之間做狀態的檢查、彼此等待用的。
std::barrier
std::barrier
這東西應該算是數量明確的多執行序程式中的「同步點」,一般的使用情境是需要等到所有的執行序都到達這個位置後、才能繼續做之後的事情。
另外,std::barrier
同時也可以附加一個可呼叫物件、在滿足條件後會自動執行(callback function)。
這樣的設計主要是為了多階段的工作而設計的。在開發多執行序程式的時候,有的時候會需要等到所有執行序都完成特定工作後、才能繼續下一步;如果直接寫的話,可能會變成下面的形式:
#include <array> #include <iostream> #include <syncstream> #include <thread> int main() { constexpr size_t numThread = 3; std::array<std::jthread, numThread> vThreads; // stage 1 for (auto& t : vThreads) { t = std::jthread( []() { std::osyncstream(std::cout) << "stage 1\n"; }); } // sync point for (auto& t : vThreads) t.join(); std::cout << "sync finished" << std::endl; // stage 2 for (auto& t : vThreads) { t = std::jthread( []() { std::osyncstream(std::cout) << "stage 2\n"; }); } }
這邊基本上就是先開多執行序來執行第一階段的工作,等到大家都做完後,再重新建立出新的執行序、來執行第二階段的工作。
這邊執行的結果會是:
stage 1 stage 1 stage 1 sync finished stage 2 stage 2 stage 2
而同樣的需求如果改用 std::barrier
的話,則可以簡化成下面的形式:
#include <array> #include <iostream> #include <syncstream> #include <thread> #include <barrier> int main() { constexpr size_t numThread = 3; // setup barrier std::barrier sync_point(numThread, []() noexcept { std::cout << "sync finished" << std::endl; }); std::array<std::jthread, numThread> vThreads; for (auto& t : vThreads) { t = std::jthread( [&sync_point]() { // stage 1 std::osyncstream(std::cout) << "stage 1\n"; sync_point.arrive_and_wait(); // stage 2 std::osyncstream(std::cout) << "stage 2\n"; }); } }
這邊 vThreads
裡面的執行序會在建立出來後就開始執行第一階段的工作,而到了「sync_point.arrive_and_wait()
」這邊,就會停下來等其他執行序也到直行到這裡,然後再執行 sync_point
的 callback 函式(只會執行一次)、之後再執行第二階段的內容。
這樣執行的結果會和前面是一致的。
另外,std::barrier
這個類別的物件是可以重複使用的,所以如果多執行序的工作分的截斷更多的話,那 std::barrier
用起來會很方便,只要寫成:
t = std::jthread( [&sync_point]() { std::osyncstream(std::cout) << "stage 1\n"; sync_point.arrive_and_wait(); std::osyncstream(std::cout) << "stage 2\n"; sync_point.arrive_and_wait(); std::osyncstream(std::cout) << "stage 3\n"; sync_point.arrive_and_wait(); std::osyncstream(std::cout) << "stage 4\n"; });
這樣就可以把工作分成四個階段、每個階段都會等所有的執行序完成後才會進入下一個階段了~
也因此,使用 std::barrier
的好處,除了可能可以簡化程式碼外,還可以避免重新建立執行序;由於建立新的執行序也有一定的成本,所以如果執行序的數量多的話,這類的需求使用 std::barrier
來做其實是有可能提升效能的。
而在 C++20 提供 std::barrier
前,雖然也可以透過 atomic<int>
自己計數來做得到類似的效果,但是透過 std::barrier
會較為簡單、語意也比較明確。
另外,雖然這邊雖然是所有執行序都在做一樣的事情,但是實際上他應該也是可以用在更複雜的情境的。
像是他也有提供 arrive_and_drop()
這個會讓後續要等待的執行序數量變少的函式,或許在某些情境下會有讓程式更有彈性?
在下面的例子裡面,可以看到這邊是用 sp
這個 std::barrier
給三個 thread 使用。
#include <iostream> #include <syncstream> #include <thread> #include <barrier> int main() { using namespace std::chrono_literals; std::barrier sp(3, []() noexcept { std::cout << " > sync point" << std::endl; }); std::jthread t1( [&sp]() { std::osyncstream(std::cout) << "T1 phase 1" << std::endl; sp.arrive_and_drop(); std::osyncstream(std::cout) << "T1 finished" << std::endl; }); std::jthread t2( [&sp]() { std::osyncstream(std::cout) << "T2 phase 1" << std::endl; sp.arrive_and_wait(); std::osyncstream(std::cout) << "T2 phase 2" << std::endl; sp.arrive_and_wait(); std::osyncstream(std::cout) << "T2 finished" << std::endl; }); std::jthread t3( [&sp]() { std::osyncstream(std::cout) << "T3 phase 1" << std::endl; std::this_thread::sleep_for(0.5s); sp.arrive_and_wait(); std::osyncstream(std::cout) << "T3 phase 2" << std::endl; sp.arrive_and_wait(); std::osyncstream(std::cout) << "T3 finished" << std::endl; }); }
他的執行結果會類似下面這樣:
T1 phase 1 T2 phase 1 T3 phase 1 T1 finished > sync point T3 phase 2 T2 phase 2 > sync point T2 finished T3 finished
這邊在 phase 1 之後的同步點,sp
會需要等待 3 個執行序到達。
不過由於 t1
這個執行序裡面是去呼叫 arrive_and_drop()
,所以他會在到達後不等待其他執行序就直接結束,同時讓 sp
之後要等待的執行序變成 2 個。
因此、實際上 phase 2 後面的同步點、就只有 t2
和 t3
而已了。
不過在個人來看,這樣的寫法在實際上應該會很危險、很容易沒搞清楚就出問題就是了…
此外,儘管他有獨立提供 arrive()
和 wait()
這兩個獨立的函式可以使用。其中 arrive()
則可以額外指定到達的數量,算是有比較大的彈性?但是感覺也是很危險的用法,像是如果上面的程式把 t1
的 arrive_and_drop()
改成 arrive()
的話、phase 2 的同步點就會因為永遠沒有第三個執行序會到達、所以永遠卡在那了。
而由於 wait()
這個函式會需要拿內部的 arrival_token
做參數(呼叫時可以取得 arrive()
),讓 Heresy 搞不太清楚該怎麼獨立使用就是了。
std::latch
std::latch
和 std::barrier
在設計上算是滿相似的,基本上也是設定一個數量、在指定數量的執行序到達後就可以解除所有執行序的等待狀態。
不過,std::latch
在功能面上比較侷限,一個是他不能附掛可執行的物件在符合條件時自動執行,再者是他是一次性的、不能重複使用。
以兩個工作階段的例子來說,可以寫成:
#include <array> #include <iostream> #include <syncstream> #include <thread> #include <latch> int main() { constexpr size_t numThread = 3; std::latch sync_point(numThread); std::array<std::jthread, numThread> vThreads; for (auto& t : vThreads) { t = std::jthread( [&sync_point]() { std::osyncstream(std::cout) << "stage 1\n"; sync_point.arrive_and_wait(); std::osyncstream(std::cout) << "stage 2\n"; }); } }
不過也由於 std::latch
沒有辦法掛 callback function,所以要在檢查點輸出訊息就稍微麻煩一點了。
比如說這邊可以讓主執行序透過 wait()
來等其他執行序的第一階段工作完成;然後再透過第二個 std::latch
來啟動第二階段的工作。
#include <array> #include <iostream> #include <syncstream> #include <thread> #include <latch> int main() { constexpr size_t numThread = 3; std::latch sync_point(numThread); std::latch output(1); std::array<std::jthread, numThread> vThreads; for (auto& t : vThreads) { t = std::jthread( [&sync_point, &output]() { std::osyncstream(std::cout) << "stage 1\n"; sync_point.arrive_and_wait(); output.wait(); std::osyncstream(std::cout) << "stage 2\n"; }); } sync_point.wait(); std::cout << "sync point\n"; output.arrive_and_wait(); }
這邊可以看到,std::latch
的 wait()
可以直接呼叫、算是可以相當直覺地使用的;要 Heresy 來看,感覺上好像比 condition_variable
還簡單?
另外,這邊其實由於程式的邏輯,所以在呼叫 sync_point
和 output
的 arrive_and_wait()
的時候,實際上都不需要真的等待,所以其實都可以改成呼叫 count_down()
(類似 std::barrier
的 arrive()
?)就好。
(不過實際上,以這邊的例子來說,如果直接用 C++11 的 call_once()
(參考)會更簡單就是了。)
和 std::barrier
相比,std::latch
還有一個很大的不同在於它不能重複使用;也就說,這邊沒有辦法去增加、或是重設 std::latch
內部的計數器。
在使用上,std::barrier
的本質就是用來當作一群執行序的同步點,讓一群執行序可以執行到特定的地方等待。
而 std::latch
的設計則比較像是針對特定數量的工作,讓特定的執行序可以知道這些工作處理完了沒;而在處理工作的執行序,則可以透過 count_down()
這個不會停下來等待的函式來回報工作已經完成了。