C++ 多執行序開發中安全存取變數的 atomic

| | 0 Comments| 10:28
Categories:

atomic(原子)在多執行序的程式開發中,算是一個很普遍的概念。它基本上就是將一些基本的存取操作定義成不可分割的操作(原子操作),然後搭配硬體設計、確保資料在多執行序的程式中可以更有效率地安全存取資料、保證不會出現 race condition 等等的問題。

而實際上,這部分大多還牽扯到 CPU 的架構,包含指令的執行順序(現在處理器可能是亂序執行)、快取等等,在 Heresy 來看算是相當底層的東西了。

其實像之前在介紹 OpenMP 的時候、其實有大概帶過 OpenMP 的 atomic 了,但是不知道為什麼,知其在介紹 std::thread 的時候,好像忘了這個基本的東西?這邊就來稍微補一下吧~不過老實說,太底層的東西 Heresy 也沒搞懂,所以這邊主要還是簡單的使用的部分了。

首先,什麼時候會需要用到 std:atomic<>參考)呢?下面是一個例子:

#include <iostream>
#include <thread>
#include <vector>
 
int counter = 0;
 
void f()
{
  for (int n = 0; n < 100000; ++n)
  {
    ++counter;
  }
}
 
int main()
{
  auto ptStart = std::chrono::high_resolution_clock::now();
  {
    std::vector<std::jthread> pool;
    for (int n = 0; n < 10; ++n)
      pool.emplace_back(f);
  }
  auto timeUsed = std::chrono::high_resolution_clock::now() - ptStart;
 
  std::cout << "result: " << counter << ", use " << timeUsed.count() << "ns\n";
}

在這個例子裏面,是開了 10 的 thread 去針對 counter 來做累加,理論上結果應該要是 1000000,但是實際跑出來,數值大部分都是不對的,而且每次都還不一樣~

像 Heresy 這邊跑出來可能會是下面的狀況:

result: 802641, use 729401ns
result: 397793, use 1432293ns
result: 892290, use 765319ns

會有這樣的問題,基本上應該就是這十個執行序可能會同時去讀取、修改 counter 的值,結果就出包了。

而要避免出現這樣的問題,在不知道 atomic 的情形下,可能想說用 mutex lock(參考)來處理,那程式可以改成下面的形式:

#include <iostream>
#include <thread>
#include <vector>
#include <mutex>
 
std::mutex mutex;
int counter = 0;
 
void f()
{
  for (int n = 0; n < 100000; ++n)
  {
    std::lock_guard lock(mutex);
    ++counter;
  }
}
 
int main()
{
  auto ptStart = std::chrono::high_resolution_clock::now();
  {
    std::vector<std::jthread> pool;
    for (int n = 0; n < 10; ++n)
      pool.emplace_back(f);
  }
  auto timeUsed = std::chrono::high_resolution_clock::now() - ptStart;
 
  std::cout << "result: " << counter << ", use " << timeUsed.count() << "ns\n";
}

這樣就可以確保每個時間點都只有一個執行序會執行「++counter」這個指令、所以跑出來的結果就會是正確的了!

result: 1000000, use 61601090ns
result: 1000000, use 57563869ns

但是實際上 mutex lock 的成本很高,可以看到這邊計算的時間已經變成本來的幾十倍了…

而如果改用 std:atomic<> 的話,則可以寫成:

#include <iostream>
#include <thread>
#include <vector>
#include <atomic>
 
std::atomic<int> counter = 0;
 
void f()
{
  for (int n = 0; n < 100000; ++n)
  {
    ++counter;
  }
}
 
int main()
{
  auto ptStart = std::chrono::high_resolution_clock::now();
  {
    std::vector<std::jthread> pool;
    for (int n = 0; n < 10; ++n)
      pool.emplace_back(f);
  }
  auto timeUsed = std::chrono::high_resolution_clock::now() - ptStart;
 
  std::cout << "result: " << counter << ", use " << timeUsed.count() << "ns\n";
}

這邊修改的方法其實相對於 mutex 更簡單,而且不但可以確保結果正確、執行的速度更是比 mutex lock 快了大概十倍!

result: 1000000, use 6018908ns
result: 1000000, use 7970903ns

而 atomic 能比 mutex lock 來的快,主要是因為在大部分的狀況下,他可以透過 lock-free 的形式來進行,所以在效率上會好很多。


上面算是 std::atomic 最簡單的使用情境了。

不過,std:atomic<> 雖然是一個 template 型別,但是並非所有型別都可以使用,針對可以使用的型別也有透過 partial specializations 來提供不同的功能支援。

針對有支援的型別,他主要是提供了很簡單的變數的存取,可以直接透過 operator=Tstd::atomic<T> 之間轉換。下面就是簡單的示意:

std::atomic<T> ac;
T c;
ac = c;
c = ac;

此外,std:atomic<> 也還有提供 store()load()exchange() 這幾個函式、可以做進一步的資料存取與控制;在使用這些函式的時候,理論上是可以針對不同的情境、指定不同的 std::memory_order參考)來進行最佳化的。(不過這邊個人也沒搞得很懂,所以只能先跳過了。)

而實際上,std:atomic<> 主要支援的對象是整數型別,所以標準函式庫中有針對了各種整數型別定義了別名,標括了 atomic_boolatomic_charatomic_ucharatomic_int…等等;像是上面的 std::atomic<int> 就可以寫成 std::atomic_int

針對整數型別,std:atomic<> 也還提供了 ++--+=-=&=|=^= 這些數值計算的功能、方便用來計算。另外也還有 fetch_add()fetch_sub()fetch_and()fetch_or()fetch_xor() 這種可以指定 std::memory_order 的版本可以用。

而這些計算子到了 C++20 後、則是有部分允許浮點樹也可以使用。像是上面的例子裡面,如果把 counter 的型別改成 std::atomic<float> 的話,那就不能使用 ++counter 來累加數值、而是需要改寫成 counter += 1 了。(在 C++11 改成 += 也不行)


如果想要把自定義的型別拿來搭配 std:atomic<> 使用的話,需要滿足下列條件:

  • trivially copyable
  • copy constructible
  • move constructible
  • copy assignable
  • move assignable

沒理解錯的話、一個自己寫的類別要滿足上面的條件,除了本身要可以使用 memcpy() 來完整複製外,有自己重新定義 default constructor、copy / move constructor、copy / move assignment 似乎也都是不行的。(參考

此外,根據型別的不同,std:atomic<> 也有可能不是 lock-free 的;如果要確認的話,則可以透過 is_lock_free() 這個函式來確認。


C++20 的時候,std:atomic<> 還追加了 wait()notify_one()notify_all() 這三個函式,讓他可以有通知和等待的功能。

這邊的 wait()參考)比較特別的,是需要給他一個數值來比較;當它收到通知的時候,會去比較當下的數值是否和所給的數值相同,不同的話就會結束等待,如果相同的話,則會繼續等待下一次的通知。

基本上,應該就是「等到數值和指定的不一樣」的概念;不過老實說,個人覺得這個設計有點謎…如果可以給個可以回傳 true / false 的函式感覺通用性應該會更高?

下面是個簡單的例子:

#include <iostream>
#include <thread>
#include <syncstream>
 
using namespace std::chrono_literals;
 
int main()
{
  std::atomic_bool bCompleted = false;
 
  std::jthread t1(
    [&bCompleted]() {
      std::osyncstream(std::cout) << "> wait complete" << std::endl;
      bCompleted.wait(false);
      std::osyncstream(std::cout) << "> completed" << std::endl;
    });
 
  std::jthread t2(
    [&bCompleted]() {
      std::osyncstream(std::cout) << "- sleep" << std::endl;
      std::this_thread::sleep_for(0.5s);
 
      std::osyncstream(std::cout) << "- notify 1st" << std::endl;
      bCompleted.notify_all();
 
      std::osyncstream(std::cout) << "- update value" << std::endl;
      bCompleted = true;
 
      std::osyncstream(std::cout) << "- notify 2nd" << std::endl;
      bCompleted.notify_all();
    });
}

這邊在概念上是透過 bCompleted 這個 boolstd::atomic 來讓 t1 這個執行序監控 t2 完成工作了沒。

可以看到,在 t2 裡面會呼叫 notify_all() 兩次。第一次的時候因為 bCompleted 的值還是 false,所以儘管 t1 裡面的 wait() 有收到通知、但是會因為數值還是 false 所以會繼續等待。

而在 t2 裡面去將 bCompleted 的值改成 true 的時候,實際上也不會立刻讓 t1wait() 解除、而是會等到再次呼叫 notify_all() 才會讓它解除。

所以程式的輸出會是:

> wait complete
- sleep
- notify 1st
- update value
- notify 2nd
> completed

在某些情境下,std:atomic<> 應該也可以用來當作簡易型的 std::condition_variable 來用?而且由於他不需要額外的 mutex 和 lock,在使用上也會更簡單一點。但是相對地,他在使用上也缺少了一些功能(例如逾時)就是了。


這篇基本上就寫到這邊了?

至於其他的部分呢?

在 C++11 的標準裡面其實還有一個 atomic bool 特化的 std::atomic_flag參考),把 store()load() 給拿掉了,算是比較特別的應用?

而 C++20 則還有加入一個 std::atomic_ref參考)、用來把已經存在的一般變數、以參考的形式來當成 atomic 來操作。

老實說,個人本來以為 atomic 這邊應該很簡單,結果沒想到裡面細節比想像中的更底層…所以,後來搞得自己也有點混亂了。 XD
或許等以後真的有碰到的時候再回來看會更清楚吧…

另外,個人覺得比較討厭的一點,是 std::atomic<> 本身是不可複製(non-copyable)的就算了,還是不可移動的(non-movable)…所以基本上沒辦法把 std::atomic<> 放到 std::vector<> 裡面,如果有類似的需求會比較麻煩…


參考:

Leave a Reply

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *