atomic(原子)在多執行序的程式開發中,算是一個很普遍的概念。它基本上就是將一些基本的存取操作定義成不可分割的操作(原子操作),然後搭配硬體設計、確保資料在多執行序的程式中可以更有效率地安全存取資料、保證不會出現 race condition 等等的問題。
而實際上,這部分大多還牽扯到 CPU 的架構,包含指令的執行順序(現在處理器可能是亂序執行)、快取等等,在 Heresy 來看算是相當底層的東西了。
其實像之前在介紹 OpenMP 的時候、其實有大概帶過 OpenMP 的 atomic 了,但是不知道為什麼,知其在介紹 std::thread
的時候,好像忘了這個基本的東西?這邊就來稍微補一下吧~不過老實說,太底層的東西 Heresy 也沒搞懂,所以這邊主要還是簡單的使用的部分了。
首先,什麼時候會需要用到 std:atomic<>
(參考)呢?下面是一個例子:
#include <iostream> #include <thread> #include <vector> int counter = 0; void f() { for (int n = 0; n < 100000; ++n) { ++counter; } } int main() { auto ptStart = std::chrono::high_resolution_clock::now(); { std::vector<std::jthread> pool; for (int n = 0; n < 10; ++n) pool.emplace_back(f); } auto timeUsed = std::chrono::high_resolution_clock::now() - ptStart; std::cout << "result: " << counter << ", use " << timeUsed.count() << "ns\n"; }
在這個例子裏面,是開了 10 的 thread 去針對 counter
來做累加,理論上結果應該要是 1000000,但是實際跑出來,數值大部分都是不對的,而且每次都還不一樣~
像 Heresy 這邊跑出來可能會是下面的狀況:
result: 802641, use 729401ns
result: 397793, use 1432293ns
result: 892290, use 765319ns
會有這樣的問題,基本上應該就是這十個執行序可能會同時去讀取、修改 counter
的值,結果就出包了。
而要避免出現這樣的問題,在不知道 atomic 的情形下,可能想說用 mutex lock(參考)來處理,那程式可以改成下面的形式:
#include <iostream> #include <thread> #include <vector> #include <mutex> std::mutex mutex; int counter = 0; void f() { for (int n = 0; n < 100000; ++n) { std::lock_guard lock(mutex); ++counter; } } int main() { auto ptStart = std::chrono::high_resolution_clock::now(); { std::vector<std::jthread> pool; for (int n = 0; n < 10; ++n) pool.emplace_back(f); } auto timeUsed = std::chrono::high_resolution_clock::now() - ptStart; std::cout << "result: " << counter << ", use " << timeUsed.count() << "ns\n"; }
這樣就可以確保每個時間點都只有一個執行序會執行「++counter
」這個指令、所以跑出來的結果就會是正確的了!
result: 1000000, use 61601090ns result: 1000000, use 57563869ns
但是實際上 mutex lock 的成本很高,可以看到這邊計算的時間已經變成本來的幾十倍了…
而如果改用 std:atomic<>
的話,則可以寫成:
#include <iostream> #include <thread> #include <vector> #include <atomic> std::atomic<int> counter = 0; void f() { for (int n = 0; n < 100000; ++n) { ++counter; } } int main() { auto ptStart = std::chrono::high_resolution_clock::now(); { std::vector<std::jthread> pool; for (int n = 0; n < 10; ++n) pool.emplace_back(f); } auto timeUsed = std::chrono::high_resolution_clock::now() - ptStart; std::cout << "result: " << counter << ", use " << timeUsed.count() << "ns\n"; }
這邊修改的方法其實相對於 mutex 更簡單,而且不但可以確保結果正確、執行的速度更是比 mutex lock 快了大概十倍!
result: 1000000, use 6018908ns result: 1000000, use 7970903ns
而 atomic 能比 mutex lock 來的快,主要是因為在大部分的狀況下,他可以透過 lock-free 的形式來進行,所以在效率上會好很多。
上面算是 std::atomic
最簡單的使用情境了。
不過,std:atomic<>
雖然是一個 template 型別,但是並非所有型別都可以使用,針對可以使用的型別也有透過 partial specializations 來提供不同的功能支援。
針對有支援的型別,他主要是提供了很簡單的變數的存取,可以直接透過 operator=
在 T
和 std::atomic<T>
之間轉換。下面就是簡單的示意:
std::atomic<T> ac; T c; ac = c; c = ac;
此外,std:atomic<>
也還有提供 store()
、load()
、exchange()
這幾個函式、可以做進一步的資料存取與控制;在使用這些函式的時候,理論上是可以針對不同的情境、指定不同的 std::memory_order
(參考)來進行最佳化的。(不過這邊個人也沒搞得很懂,所以只能先跳過了。)
而實際上,std:atomic<>
主要支援的對象是整數型別,所以標準函式庫中有針對了各種整數型別定義了別名,標括了 atomic_bool
、atomic_char
、atomic_uchar
、atomic_int
…等等;像是上面的 std::atomic<int>
就可以寫成 std::atomic_int
。
針對整數型別,std:atomic<>
也還提供了 ++
、--
、+=
、-=
、&=
、|=
、^=
這些數值計算的功能、方便用來計算。另外也還有 fetch_add()
、fetch_sub()
、fetch_and()
、fetch_or()
、fetch_xor()
這種可以指定 std::memory_order
的版本可以用。
而這些計算子到了 C++20 後、則是有部分允許浮點樹也可以使用。像是上面的例子裡面,如果把 counter
的型別改成 std::atomic<float>
的話,那就不能使用 ++counter
來累加數值、而是需要改寫成 counter += 1
了。(在 C++11 改成 +=
也不行)
如果想要把自定義的型別拿來搭配 std:atomic<>
使用的話,需要滿足下列條件:
- trivially copyable
- copy constructible
- move constructible
- copy assignable
- move assignable
沒理解錯的話、一個自己寫的類別要滿足上面的條件,除了本身要可以使用 memcpy()
來完整複製外,有自己重新定義 default constructor、copy / move constructor、copy / move assignment 似乎也都是不行的。(參考)
此外,根據型別的不同,std:atomic<>
也有可能不是 lock-free 的;如果要確認的話,則可以透過 is_lock_free()
這個函式來確認。
在 C++20 的時候,std:atomic<>
還追加了 wait()
、notify_one()
、notify_all()
這三個函式,讓他可以有通知和等待的功能。
這邊的 wait()
(參考)比較特別的,是需要給他一個數值來比較;當它收到通知的時候,會去比較當下的數值是否和所給的數值相同,不同的話就會結束等待,如果相同的話,則會繼續等待下一次的通知。
基本上,應該就是「等到數值和指定的不一樣」的概念;不過老實說,個人覺得這個設計有點謎…如果可以給個可以回傳 true / false 的函式感覺通用性應該會更高?
下面是個簡單的例子:
#include <iostream> #include <thread> #include <syncstream> using namespace std::chrono_literals; int main() { std::atomic_bool bCompleted = false; std::jthread t1( [&bCompleted]() { std::osyncstream(std::cout) << "> wait complete" << std::endl; bCompleted.wait(false); std::osyncstream(std::cout) << "> completed" << std::endl; }); std::jthread t2( [&bCompleted]() { std::osyncstream(std::cout) << "- sleep" << std::endl; std::this_thread::sleep_for(0.5s); std::osyncstream(std::cout) << "- notify 1st" << std::endl; bCompleted.notify_all(); std::osyncstream(std::cout) << "- update value" << std::endl; bCompleted = true; std::osyncstream(std::cout) << "- notify 2nd" << std::endl; bCompleted.notify_all(); }); }
這邊在概念上是透過 bCompleted
這個 bool
的 std::atomic
來讓 t1
這個執行序監控 t2
完成工作了沒。
可以看到,在 t2
裡面會呼叫 notify_all()
兩次。第一次的時候因為 bCompleted
的值還是 false
,所以儘管 t1
裡面的 wait()
有收到通知、但是會因為數值還是 false
所以會繼續等待。
而在 t2
裡面去將 bCompleted
的值改成 true
的時候,實際上也不會立刻讓 t1
的 wait()
解除、而是會等到再次呼叫 notify_all()
才會讓它解除。
所以程式的輸出會是:
> wait complete - sleep - notify 1st - update value - notify 2nd > completed
在某些情境下,std:atomic<>
應該也可以用來當作簡易型的 std::condition_variable
來用?而且由於他不需要額外的 mutex 和 lock,在使用上也會更簡單一點。但是相對地,他在使用上也缺少了一些功能(例如逾時)就是了。
這篇基本上就寫到這邊了?
至於其他的部分呢?
在 C++11 的標準裡面其實還有一個 atomic bool 特化的 std::atomic_flag
(參考),把 store()
、load()
給拿掉了,算是比較特別的應用?
而 C++20 則還有加入一個 std::atomic_ref
(參考)、用來把已經存在的一般變數、以參考的形式來當成 atomic 來操作。
老實說,個人本來以為 atomic 這邊應該很簡單,結果沒想到裡面細節比想像中的更底層…所以,後來搞得自己也有點混亂了。 XD
或許等以後真的有碰到的時候再回來看會更清楚吧…
另外,個人覺得比較討厭的一點,是 std::atomic<>
本身是不可複製(non-copyable)的就算了,還是不可移動的(non-movable)…所以基本上沒辦法把 std::atomic<>
放到 std::vector<>
裡面,如果有類似的需求會比較麻煩…
參考: