C++11 程式的平行化:async 與 future

| | 0 Comments| 14:12
Categories:

之前在《C 的多執行序程式開發 Thread:基本使用》這系列文章裡面,Heresy 已經大概整理了一下 C 11 提供的 std::thread 這個執行序函式庫的使用方法了。

不過,實際上在 C 11 的「Thread support library」(參考)裡面,除了 std::thread 以外,還有不少東西可以用。這篇要寫的,就是其中的 std::async()std::future<>

基本上,使用 std::thread 來建立一個新的執行序、另外執行某些計算,在 C 中算是比較底層的方法;它的功能很彈性,但是相對的,也有一些比較麻煩的地方。比如說透過 std::thread 執行的函式基本上無法回傳值,而在其他執行序中產生的 exception 也沒有辦法被 catch 到。

std::async() 在某方面來說,則算是一個特化的功能,他的設計概念,就是

當要進行一個沒有馬上要用其結果的複雜計算的時候,把計算丟到另一個執行序去,等到之後真的要用的時候,才去確認他跑完沒、並取得他的結果。

比如說,下面就是一個簡單的例子:

#include <chrono>
#include <thread>
#include <iostream>

int Compute( int iIn)
{
std::this_thread::sleep_for(std::chrono::seconds(iIn));
return iIn;
}

int main(int argc, char** argv)
{
auto tpStart = std::chrono::high_resolution_clock::now();

int iRes1 = Compute(1),
iRes2 = Compute(2);

int iResult = iRes1 iRes2;
auto duTime = std::chrono::high_resolution_clock::now() tpStart;

std::cout << "Result is " << iResult << ", use "
<< std::chrono::duration_cast<std::chrono::milliseconds>(duTime).count()
<< "ms" << std::endl;
}

在這段程式中,首先是要去呼叫 Comput() 這個函式,來計算出 iRes1iRes2 這兩個值,而在計算完之後,再把兩個數值加總、得到最後的答案 iResult

而在這邊,Comput() 這個函式基本上就是在浪費時間、模擬複雜、費時的計算而已;他會根據傳入的值去等待,所以在計算 iRes1 時,會等一秒,而在計算 iRes2 實則會等兩秒。

所以,上面的程式在執行後,結果大致上會需要三秒(其他的結果的時間小到可忽略),最後輸出會是:

Result is 3, use 3000ms

但是,由於 iRes1iRes2 兩者個計算實際上是不相關的,所以如果可以同時計算這兩者,那不就可以省掉很多時間了嗎?

這邊如果是使用 std::async()std::future<> 來改寫的話,基本上會變成:

#include <chrono>
#include <future>
#include <iostream>

int Compute( int iIn)
{
std::this_thread::sleep_for(std::chrono::seconds(iIn));
return iIn;
}

int main(int argc, char** argv)
{
auto tpStart = std::chrono::high_resolution_clock::now();

std::future<int> fuRes1 = std::async(std::launch::async, Compute, 1),
fuRes2 = std::async(std::launch::async, Compute, 2);

int iResult = fuRes1.get() fuRes2.get();
auto duTime = std::chrono::high_resolution_clock::now() tpStart;

std::cout << "Result is " << iResult << ", use "
<< std::chrono::duration_cast<std::chrono::milliseconds>(duTime).count()
<< "ms" << std::endl;
}

首先,要使用 std::async()std::future<> 的話,需要引入 future 這個 header 檔。

而本來的 int iRes1 = Compute(1),則是要改寫成:

std::future<int> fuRes = std::async( std::launch::async, Compute, 1 )

可以看到,這邊是去呼叫 std::async() 這個函式(參考),而他的第一個參數「std::launch::async」則是代表要使用非同步(asynchronous)模式去執行指定的函式。

std::async() 的第二個參數,則是要去執行的函式,這邊就是 Compute() 這個函式了;再之後的參數(這邊是 1),則是會在執行時,傳給 Compute() 這個函式的參數。

std::async() 會把 Compute() 這個函式回傳的資料,從 int 封包成 std::future<int> 的形式(參考)。而要讀取它的結果的話,可以透過 get() 這個函式,來取得計算的結果;如果還沒有計算完的話,他則會停在那邊,等到計算完為止。所以最後在執行

int iResult = fuRes1.get() fuRes2.get();

加總的時候,不用擔心個別的執行序是否已經計算完成了,因為 get() 這個函式會幫忙去確認這件事。

在經過這樣的改寫之後,程式在執行的時候,就會建立出兩個新的執行序,分別去計算 fuRes1fuRes2,所以兩者的計算是可以平行進行的~而執行的結果,基本上會是:

Result is 3, use 2000ms

可以看到,在這樣的修改之後,所需的時間就只剩下兩秒了!和之前沒有平行化的結果相比,計算 fuRes1 所需要的一秒,由於跟計算 fuRes2 的兩秒是重疊的,所以就不會需要把兩者個時間加起來了~

而這基本上,也就是 std::async()std::future<> 最簡單的使用概念了~


如果要用 std::thread 來做同樣的事情,當然也是可以的。不過由於 std::thread 無法處理回傳值,所以這邊會需要另外去想辦法把 Compute() 回傳的結果記錄下來。

例如下面的寫法裡面,就是另外建立一個 threadCompute() 的函式,去處理 Compute() 的回傳值;而之後就建立 th1th2 這兩個執行序、去執行 threadCompute()

#include <chrono>
#include <thread>
#include <iostream>

int Compute(int iIn)
{
std::this_thread::sleep_for(std::chrono::seconds(iIn));
return iIn;
}

void threadCompute(int iIn, int* pOut)
{
*pOut = Compute( iIn );
}

int main(int argc, char** argv)
{
auto tpStart = std::chrono::high_resolution_clock::now();

int iRes1, iRes2;
std::thread th1(threadCompute, 1, &iRes1),
th2(threadCompute, 2, &iRes2);
th1.join();
th2.join();
int iResult = iRes1 iRes2;
auto duTime = std::chrono::high_resolution_clock::now() tpStart;

std::cout << "Result is " << iResult << ", use "
<< std::chrono::duration_cast<std::chrono::milliseconds>(duTime).count()
<< "ms" << std::endl;
}

當然,這樣的寫法也是可以的。不過由於 iRes1iRes2 的值會在 threadCompute() 中被改動,所以在要使用前,必須要先呼叫 th1th2join() 函式,以確保這兩個執行序的計算都已經完成了。

但是,在這個簡單的範例裡面,要記得加上這個動作並不算太難,不過如果程式很長的話,那在撰寫的時候,很有可能會忽略了去檢查 th1th2 是否已經完成,而直接去使用 iRes1iRes2 的值;這時候,就有可能讓結果錯誤了!

所以相較之下,使用 std::future<> 應該算是一個比較安全的方法。


另外,相較於使用 std::thread 來說,使用 std::async()std::future<> 還有另一個好處,就是可以處理 exception。

如果是使用 std::thread 的話,那在新的執行序中所丟出的 exception,基本上除非有另外處理,否則是不會在本來的主執行序中被接到的。

例如在下面的程式中,在 Compute() 這個函式中,如果 iIn 大於 1 的話,就會丟一個 std::runtime_error 出來。

#include <chrono>
#include <thread>
#include <iostream>

int Compute(int iIn)
{
if (iIn > 1)
throw std::runtime_error("too long");
std::this_thread::sleep_for(std::chrono::seconds(iIn));
return iIn;
}

void threadCompute(int iIn, int* pOut)
{
*pOut = Compute( iIn );
}

int main(int argc, char** argv)
{
try
{
int iRes1, iRes2;
std::thread th1(threadCompute, 1, &iRes1),
th2(threadCompute, 2, &iRes2);
th1.join();
th2.join();
int iResult = iRes1 iRes2;
std::cout << "Result is " << iResult << std::endl;
}
catch ( std::exception& exp)
{
std::cerr << exp.what() << std::endl;
}
}

而雖然在 main() 裡面,已經有去撰寫 try{}catch() 了,但是實際跑過的話,就會發現它根本接不到這個 exception!

如果要修改的話,則是需要使用 C 11 新的 std::exception_ptr參考)來做中介,才能坐在主執行序裡面去處理這個 exception。

不過,如果是使用 std::async()std::future<> 的話,那就直接可以接到這個 exception 了~

#include <chrono>
#include <future>
#include <iostream>

int Compute(int iIn)
{
if (iIn > 1)
throw std::runtime_error("too long");
std::this_thread::sleep_for(std::chrono::seconds(iIn));
return iIn;
}

int main(int argc, char** argv)
{
try
{
std::future<int> fuRes1 = std::async(std::launch::async, Compute, 1),
fuRes2 = std::async(std::launch::async, Compute, 2);

int iResult = fuRes1.get() fuRes2.get();
std::cout << "Result is " << iResult << std::endl;
}
catch ( std::exception& exp)
{
std::cerr << exp.what() << std::endl;
}
}

所以,在某方面來說,算是方便不少的。


附註:

  1. std::async() 的第二個參數除了 std::launch::async 是代表要用心的執行序來計算外,還有另一個選擇是 std::launch::deferred,他是採取「lazy evaluation」的模式,也就是當呼叫 get() 的時候,才會真的去計算。(參考

  2. 實際上 std::future<> 還有一些其他像是 wait_for() 的函式,在某些時候應該也是滿實用的;而除了和 std::async() 搭配使用外,他也還可以搭配 std::packaged_task參考)和 std::promise參考)來使用。

  3. std::future<> 是不可複製的物件,如果真的有需要有多份的話,則可以透過 share() 這個函式,取得 std::shared_future<> 的物件(參考)。

  4. std::chrono 是用來紀錄時間用的,介紹請參考《C 11 STL 的時間函式庫:chrono》。

  5. 額外參考:《The promises and challenges of std::async task-based parallelism in C 11

Leave a Reply

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *