C++20 多執行序的 semaphore

| | 0 Comments| 09:28
Categories:

這篇來講一下 C++20 針對多執行序/併行(Concurrency)新增的另一個功能:semaphore(信號、C++ Reference)。

他的基本型別是 std::counting_semaphore,在內部擁有一個計數器來記錄總共可用的數量;這邊或許把他想像成總共有好幾張椅子,然後讓大家都要來坐的概念會比較好理解。

使用時基本上就是透過呼叫 acquire() 來向 counting_semaphore 要求一個位置、這時候它的內部記數器就會減 1、代表有一個位置被用掉了;而在用完之後,則是要透過呼叫 release() 這個函式來釋出位置、這時候內部計數器就會加 1。

而如果在呼叫 acquire() 的時候內部的計數器是 0 的話就代表已經沒有位置了,這時候就必須在那邊等到有位置被釋出為止。

所以感覺起來,他主要的功能應該是用來控制「同時可以使用的數量」的東西。


基本使用範例

比如說這邊可以先建立 10 個執行序來執行工作、但是又另外定義一個大小是 3 的 std::counting_semaphore,這樣就可以限制某個階段的動作同一時間只有 3 個執行序可以執行。

下面就是一個這樣的例子:

#include <chrono>
#include <iostream>
#include <vector>
#include <thread>
#include <semaphore>
#include <syncstream>
 
using namespace std::chrono_literals;
 
constexpr int numThread = 10;
std::counting_semaphore semaphore(3);
 
void worker( size_t idx)
{
  semaphore.acquire();
  std::osyncstream(std::cout) << "[" << idx << "] start" << std::endl;
  // do job!
  std::this_thread::sleep_for(0.1s);
  std::osyncstream(std::cout) << "[" << idx << "] finished" << std::endl;
  semaphore.release();
}
 
int main()
{
  std::vector<std::jthread> vThreadPool;
  for (int i = 0; i < numThread; ++i)
    vThreadPool.emplace_back(worker, i);
}

這邊是定義了一個 std::counting_semaphore 的變數 semaphore、並指定他的初始數值是 3。

之後則是建立了 10 個執行序,裡面會透過 acquire() 來向 semaphore 送出要求;而由於 semaphore 的大小是 3,所以最多同時就只能有 3 的執行序可以直接通過這個函式、其他的執行序都會因為 semaphore 內的計數器已經是 0 而必須在這邊等待到別人釋出。

再來,則是等個 0.1 秒後,就透過 release() 來告訴 semaphore 自己該做的事已經做完了、可以釋出之前占用的位置了。而此時,其他執行序就有機會可以繼續做事了。


視覺化版本的範例

不過上面的例子的輸出可能比較不容易看出來「同時只有三個執行序在做事」的狀態,所以後來用改了一個比較複雜、但是視覺化的版本:

#include <chrono>
#include <iostream>
#include <array>
#include <vector>
#include <thread>
#include <atomic>
#include <semaphore>
 
using namespace std::chrono_literals;
 
constexpr int numThread = 10;
std::counting_semaphore semaphore(3);
 
std::atomic_int numJobs = numThread;  // used to counting unfinished thread
std::array<std::atomic_char, numThread> aThreadStatus;  // status of each thread
 
void worker(size_t idx)
{
  aThreadStatus[idx] = 0;
  semaphore.acquire();
  {
    aThreadStatus[idx] = 1;
    // do job!
    std::this_thread::sleep_for(0.1s);
  }
  semaphore.release();
  aThreadStatus[idx] = 2;
  --numJobs;
}
 
int main()
{
  std::vector<std::jthread> vThreadPool;
  for (int i = 0; i < numThread; ++i)
    vThreadPool.emplace_back(worker, i);
 
  // output thread index
  for (int i = 0; i < numThread; ++i)
    std::cout << i << " ";
  std::cout << "\n";
 
  // monitor thread status
  while (numJobs > 0)
  {
    for (const auto& status : aThreadStatus)
    {
      switch (status)
      {
      case 0: std::cout << "- "; break;
      case 1: std::cout << "| "; break;
      case 2: std::cout << "  "; break;
      }
    }
    std::cout << "\n";
 
    std::this_thread::sleep_for(0.05s);
  }
}

這邊是會透過 aThreadStatus 來記錄每個執行序的狀態(因為 std::atmoic<> 不能放進 std::vector<>、所以這邊用 std::array<>)、並透過 numJobs 來記錄還沒在進行的執行序數量,作為主執行序結束的判斷依據。

在主執行序則是會把 aThreadStatus 畫出來,結果應該會類似下面的狀況:

0 1 2 3 4 5 6 7 8 9
| | | - - - - - - -
| | | - - - - - - -
      | | - - | - -
      | | - - | - -
          | |   | -
          | |   | -
                  |
                  |

可以看到,一開始只有 0、1、2 三個執行序在執行工作(狀態值 1),之後則是 3、4、7 這樣子。

有興趣的話也可以自己修改數值看看。


semaphore 的最大值

上面雖然在定義 semaphore 的時候都是直接寫:

std::counting_semaphore semaphore(3);

但是這邊需要提醒一下,這邊建構的引數 3 代表的是 semaphore 初始化完成時,內部計數器的數值,而不是最大值。也就是說,事後透過 release() 是可以持續增加數值的!

例如:

std::counting_semaphore semaphore(3);
semaphore.acquire(); // 2
semaphore.release(); // 3
semaphore.release(); // 4

這邊後面因為 release() 多呼叫了一次,所以 semaphore 最後會變成是 4。

那要怎麼設定最大值呢?實際上 std::counting_semaphore 是一個 template 型別,透過 template 的引數、可以指定最大值。

例如想要限制他最大是 3 的話,就可以定義成:

std::counting_semaphore<3> semaphore(3);

不過…這邊的「最大值」感覺上似乎還是有點微妙?在 C++ Reference 上這個 template 引數的名稱是「LeastMaxValue」,在說明中也寫了很謎的「LeastMaxValue is the minimum max value, not the actual max value」?感覺上…很微妙啊。

而雖然感覺上這個最大值應該應該會讓他不會因為 release() 太多次而超出述職的上限;但是實際上,以下面的例子來說:

std::counting_semaphore<3> semaphore(3);
semaphore.release(); // Should error

他在 Visual Studio 2022 裡面、呼叫 release() 是會出錯的!(但是也沒辦法透過 try-catch 來攔截)

但是同樣的程式,在 g++ 12.3 卻似乎可以繼續執行,感覺設定的上限根本沒用…

所以…恩,使用的時候還是自己多注意吧?


binary_semaphore

除了標準型的 std::counting_semaphore 外,標準函式庫還有提供一個特化版的 std::binary_semaphore,他基本上就是 std::counting_semaphore<1>

由於他理論上只有 0 / 1 兩種狀態,所以使用上變得相對簡單,在某種意義上使用也變得更直覺;像是在某些情境下,就可以用來取代 mutex lock 來限制僅有單一執行序在執行。

下面的程式、是開啟 10 個執行序來輸出各自的 thread id:

#include <iostream>
#include <vector>
#include <thread>
 
int main()
{
  std::vector<std::jthread> vPool;
  for (int i = 0; i < 10; ++i)
  {
    vPool.emplace_back(
      []() {
        std::cout << "The thread " << std::this_thread::get_id()
          << " is" << " running" << "\n";
      }
    );
  }
}

但是這邊由於是直接使用 std::cout、所以每個 operator<< 可能會被打斷、最後輸出的結果很可能會很混亂、不是預期的狀況。

而除了可以直接用 mutex lock 之外,也可以透過 std::binary_semaphore 來確保每次只有一個執行序在輸出;下面就是寫法:

#include <iostream>
#include <vector>
#include <thread>
#include <semaphore>
 
std::binary_semaphore lock(1);
 
int main()
{
  std::vector<std::jthread> vPool;
  for (int i = 0; i < 10; ++i)
  {
    vPool.emplace_back(
      []() {
        lock.acquire();
        std::cout << "The thread " << std::this_thread::get_id()
          << " is" << " running" << "\n";
        lock.release();
      }
    );
  }
}

這樣的寫法其實就是接近使用 mutex lock、確保同時只有一個執行序在輸出、讓輸出的結果不會被打亂的。(不過以這個情境來說,建議直接使用 std::osyncstream 應該比較快)

不過雖然可以做到類似的事情,但是實際上 std::binary_semaphore 的本質還是和 mutex 不同的、semaphore 不會被綁在執行序上,可以在第一個執行序 acquire() 、然後在另一個執行序 release()


而他也可以用來作為多執行序間的通知,在某種程度上取代 std::condition_variable;像是下面就是一個修改之前使用 std::condition_variable 寫的例子:

#include <thread>
#include <semaphore>
#include <iostream>
 
using namespace std;
 
binary_semaphore notifier{ 0 };
 
void funcThread()
{
  cout << "[" << this_thread::get_id() << "] Thread started." << endl;
  notifier.acquire();
  cout << "[" << this_thread::get_id() << "] Thread end." << endl;
}
 
int main(int argc, char** argv)
{
  cout << "[Main] create new thread" << endl;
  thread t1(funcThread);
 
  cout << "[Main] wait 1 second" << endl;
  this_thread::sleep_for(chrono::seconds(1));
 
  cout << "[Main] send notify" << endl;
  notifier.release();
 
  cout << "[Main] wait thread stop" << endl;
  t1.join();
}

感覺在使用上其實還算滿方便的?只是在程式碼的語意上感覺就沒那麼明確就是了。


其他函式

除了上面基本的 acquire()release() 外,其實還有一些其他更進一步用的用法。

release() 來說,實際上他可以透過引數來指定要釋出的數量,例如可以呼叫 release(5) 這樣子。(但是 acquire() 不能指定數量?)

acquire() 除了原始的會停在那邊等待的版本外,則是還有提供 try_acquire() 這個不會等待的版本、以及 try_acquire_for()try_acquire_until() 這兩個版本可以指定要等多久的版本。

而透過回傳值,則可知道是否有成功取得;透過這種 non-block 的函式,就可以讓沒有成功佔到位置的執行序可以去做別的工作了。下面是一個簡單的使用範例:

if (semaphore.try_acquire())
{
  // Do job 1
  semaphore.release();
}
else
{
  // Do job 2
}

透過這樣的機制,應該也可以實作出讓多個執行序優先做某些事情(job 1)、但是如果已經有限制數量內的執行序已經在做這件事的話,那多餘的執行序就先去做別的事(job 2)這樣的架構。

不過這樣的程式寫出來還滿雜的(job queue 要寫成 thread-safe 就很麻煩了),所以就不貼程式碼了。


這部分大概就這樣了?老實說,現階段沒有想到什麼時候真的需要使用這東西?或許得等到之後真的有對應需求才會知道了吧…

Leave a Reply

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *