C++20 多執行序間的同步點 barrier 與 latch

| | 0 Comments| 13:07
Categories:

C++20 在多執行序程式開發的部分,除了提供了新的 std::jthread 外,其實還有提供一些新的多執行序同步功能可以使用,像是之前介紹過的 std::counting_semaphore 就是其中之一。而接下來這篇則是來講一下多執行序之間同步用的 std::barrierstd::latch

其中,std::barrier文件、可以翻譯成柵欄、屏障)主要應該是用來給一群固定大小的執行序用的同步點;例如要確保所有的執行序都跑完第一階段的工作後,再一起開始執行第二階段的工作這樣的概念。

std::latch文件,可以翻譯成門閂)則比較像是設計來給不同類型的執行序/工作之間做狀態的檢查、彼此等待用的。

std::barrier

std::barrier 這東西應該算是數量明確的多執行序程式中的「同步點」,一般的使用情境是需要等到所有的執行序都到達這個位置後、才能繼續做之後的事情。
另外,std::barrier 同時也可以附加一個可呼叫物件、在滿足條件後會自動執行(callback function)。

這樣的設計主要是為了多階段的工作而設計的。在開發多執行序程式的時候,有的時候會需要等到所有執行序都完成特定工作後、才能繼續下一步;如果直接寫的話,可能會變成下面的形式:

#include <array>
#include <iostream>
#include <syncstream>
#include <thread>
 
int main()
{
  constexpr size_t numThread = 3;
 
  std::array<std::jthread, numThread> vThreads;
 
  // stage 1
  for (auto& t : vThreads)
  {
    t = std::jthread(
      []() {
        std::osyncstream(std::cout) << "stage 1\n";
      });
  }
 
  // sync point
  for (auto& t : vThreads)
    t.join();
  std::cout << "sync finished" << std::endl;
 
  // stage 2
  for (auto& t : vThreads)
  {
    t = std::jthread(
      []() {
        std::osyncstream(std::cout) << "stage 2\n";
      });
  }
}

這邊基本上就是先開多執行序來執行第一階段的工作,等到大家都做完後,再重新建立出新的執行序、來執行第二階段的工作。

這邊執行的結果會是:

stage 1
stage 1
stage 1
sync finished
stage 2
stage 2
stage 2

而同樣的需求如果改用  std::barrier 的話,則可以簡化成下面的形式:

#include <array>
#include <iostream>
#include <syncstream>
#include <thread>
#include <barrier>
 
int main()
{
  constexpr size_t numThread = 3;
 
  // setup barrier
  std::barrier sync_point(numThread,
    []() noexcept {
      std::cout << "sync finished" << std::endl;
    });
 
  std::array<std::jthread, numThread> vThreads;
  for (auto& t : vThreads)
  {
    t = std::jthread(
      [&sync_point]() {
        // stage 1
        std::osyncstream(std::cout) << "stage 1\n";
 
        sync_point.arrive_and_wait();
 
        // stage 2
        std::osyncstream(std::cout) << "stage 2\n";
      });
  }
}

這邊 vThreads 裡面的執行序會在建立出來後就開始執行第一階段的工作,而到了「sync_point.arrive_and_wait()」這邊,就會停下來等其他執行序也到直行到這裡,然後再執行 sync_point 的 callback 函式(只會執行一次)、之後再執行第二階段的內容。

這樣執行的結果會和前面是一致的。


另外,std::barrier 這個類別的物件是可以重複使用的,所以如果多執行序的工作分的截斷更多的話,那 std::barrier 用起來會很方便,只要寫成:

t = std::jthread(
  [&sync_point]() {
    std::osyncstream(std::cout) << "stage 1\n";

    sync_point.arrive_and_wait();

    std::osyncstream(std::cout) << "stage 2\n";

    sync_point.arrive_and_wait();

    std::osyncstream(std::cout) << "stage 3\n";

    sync_point.arrive_and_wait();

    std::osyncstream(std::cout) << "stage 4\n";
  });

這樣就可以把工作分成四個階段、每個階段都會等所有的執行序完成後才會進入下一個階段了~

也因此,使用 std::barrier 的好處,除了可能可以簡化程式碼外,還可以避免重新建立執行序;由於建立新的執行序也有一定的成本,所以如果執行序的數量多的話,這類的需求使用 std::barrier 來做其實是有可能提升效能的。


而在 C++20 提供 std::barrier 前,雖然也可以透過 atomic<int> 自己計數來做得到類似的效果,但是透過 std::barrier 會較為簡單、語意也比較明確。

另外,雖然這邊雖然是所有執行序都在做一樣的事情,但是實際上他應該也是可以用在更複雜的情境的。

像是他也有提供 arrive_and_drop() 這個會讓後續要等待的執行序數量變少的函式,或許在某些情境下會有讓程式更有彈性?

在下面的例子裡面,可以看到這邊是用 sp 這個  std::barrier 給三個 thread 使用。

#include <iostream>
#include <syncstream>
#include <thread>
#include <barrier>
 
int main()
{
  using namespace std::chrono_literals;
 
  std::barrier sp(3,
    []() noexcept {
      std::cout << " > sync point" << std::endl;
    });
 
  std::jthread t1(
    [&sp]() {
      std::osyncstream(std::cout) << "T1 phase 1" << std::endl;
      sp.arrive_and_drop();
      std::osyncstream(std::cout) << "T1 finished" << std::endl;
    });
 
  std::jthread t2(
    [&sp]() {
      std::osyncstream(std::cout) << "T2 phase 1" << std::endl;
      sp.arrive_and_wait();
      std::osyncstream(std::cout) << "T2 phase 2" << std::endl;
      sp.arrive_and_wait();
      std::osyncstream(std::cout) << "T2 finished" << std::endl;
    });
 
  std::jthread t3(
    [&sp]() {
      std::osyncstream(std::cout) << "T3 phase 1" << std::endl;
      std::this_thread::sleep_for(0.5s);
      sp.arrive_and_wait();
      std::osyncstream(std::cout) << "T3 phase 2" << std::endl;
      sp.arrive_and_wait();
      std::osyncstream(std::cout) << "T3 finished" << std::endl;
    });
}

他的執行結果會類似下面這樣:

T1 phase 1
T2 phase 1
T3 phase 1
T1 finished
 > sync point
T3 phase 2
T2 phase 2
 > sync point
T2 finished
T3 finished

這邊在 phase 1 之後的同步點,sp 會需要等待 3 個執行序到達。

不過由於 t1 這個執行序裡面是去呼叫 arrive_and_drop(),所以他會在到達後不等待其他執行序就直接結束,同時讓 sp 之後要等待的執行序變成 2 個。

因此、實際上 phase 2 後面的同步點、就只有 t2t3 而已了。

不過在個人來看,這樣的寫法在實際上應該會很危險、很容易沒搞清楚就出問題就是了…

此外,儘管他有獨立提供 arrive()wait() 這兩個獨立的函式可以使用。其中 arrive() 則可以額外指定到達的數量,算是有比較大的彈性?但是感覺也是很危險的用法,像是如果上面的程式把 t1arrive_and_drop() 改成 arrive() 的話、phase 2 的同步點就會因為永遠沒有第三個執行序會到達、所以永遠卡在那了。

而由於 wait() 這個函式會需要拿內部的 arrival_token 做參數(呼叫時可以取得 arrive()),讓 Heresy 搞不太清楚該怎麼獨立使用就是了。


std::latch

std::latchstd::barrier 在設計上算是滿相似的,基本上也是設定一個數量、在指定數量的執行序到達後就可以解除所有執行序的等待狀態。

不過,std::latch 在功能面上比較侷限,一個是他不能附掛可執行的物件在符合條件時自動執行,再者是他是一次性的、不能重複使用。

以兩個工作階段的例子來說,可以寫成:

#include <array>
#include <iostream>
#include <syncstream>
#include <thread>
#include <latch>
 
int main()
{
  constexpr size_t numThread = 3;
 
  std::latch sync_point(numThread);
 
  std::array<std::jthread, numThread> vThreads;
  for (auto& t : vThreads)
  {
    t = std::jthread(
      [&sync_point]() {
        std::osyncstream(std::cout) << "stage 1\n";
 
        sync_point.arrive_and_wait();
 
        std::osyncstream(std::cout) << "stage 2\n";
      });
  }
}

不過也由於 std::latch 沒有辦法掛 callback function,所以要在檢查點輸出訊息就稍微麻煩一點了。

比如說這邊可以讓主執行序透過 wait() 來等其他執行序的第一階段工作完成;然後再透過第二個 std::latch 來啟動第二階段的工作。

#include <array>
#include <iostream>
#include <syncstream>
#include <thread>
#include <latch>
 
int main()
{
  constexpr size_t numThread = 3;
 
  std::latch sync_point(numThread);
  std::latch output(1);
 
  std::array<std::jthread, numThread> vThreads;
  for (auto& t : vThreads)
  {
    t = std::jthread(
      [&sync_point, &output]() {
        std::osyncstream(std::cout) << "stage 1\n";
 
        sync_point.arrive_and_wait();
        output.wait();
 
        std::osyncstream(std::cout) << "stage 2\n";
      });
  }
 
  sync_point.wait();
  std::cout << "sync point\n";
  output.arrive_and_wait();
}

這邊可以看到,std::latchwait() 可以直接呼叫、算是可以相當直覺地使用的;要 Heresy 來看,感覺上好像比 condition_variable 還簡單?

另外,這邊其實由於程式的邏輯,所以在呼叫 sync_pointoutputarrive_and_wait() 的時候,實際上都不需要真的等待,所以其實都可以改成呼叫 count_down()(類似 std::barrierarrive()?)就好。

(不過實際上,以這邊的例子來說,如果直接用 C++11 的 call_once()參考)會更簡單就是了。)


std::barrier 相比,std::latch 還有一個很大的不同在於它不能重複使用;也就說,這邊沒有辦法去增加、或是重設 std::latch 內部的計數器。

在使用上,std::barrier 的本質就是用來當作一群執行序的同步點,讓一群執行序可以執行到特定的地方等待。

std::latch 的設計則比較像是針對特定數量的工作,讓特定的執行序可以知道這些工作處理完了沒;而在處理工作的執行序,則可以透過 count_down() 這個不會停下來等待的函式來回報工作已經完成了。

Leave a Reply

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *