KoreanFoodie's Study
C++ 기초 개념 15-4 : future, promise, packaged_task, async 본문
C++ 기초 개념 15-4 : future, promise, packaged_task, async
GoldGiver 2022. 4. 20. 14:19
모두의 코드를 참고하여 핵심 내용을 간추리고 있습니다. 자세한 내용은 모두의 코드의 씹어먹는 C++ 강좌를 참고해 주세요!
동기와 비동기
C++ 코드는 순차적으로 실행된다. 예를 들어, 파일을 읽고, 읽은 파일로 어떤 작업을 하고, 마지막으로 파일과 관계없는 작업을 수행하는 코드가 있다고 가정해 보자. 이 경우, 파일과 관계없는 작업을 하기 위해 파일을 읽기까지 기다려야 하는 지연 시간이 존재한다. 이전에 배운 쓰레드를 이용하면 이 작업을 비동기적으로 바꾸어 줄 수도 있다.
void file_read(string* result)
{
string txt = read("a.txt");
*result = do_something_with_txt(txt);
}
int main()
{
string result;
thread t(file_read, &result);
do_other_computation();
t.join();
}
위에서 file_read 함수를 쓰레드에게 맡기면, do_other_computation 을 비동기적으로 실행시킬 수 있다!
C++11 표준 라이브러리는 위와 같은 비동기적 실행을 간단하게 만들어주는 도구를 제공하고 있다.
std::promise 와 std::future
사실 비동기적 실행이라는 것은, 특정 일을 어떤 쓰레드에게 떠넘긴다는 뜻이다. 이는 어떤 쓰레드 T 를 이용해서, 미래에 (future) 쓰레드 T 가 원하는 데이터를 돌려주겠다고 약속 (promise) 한다고 볼 수 있다! 코드를 보자.
#include <future>
#include <iostream>
#include <string>
#include <thread>
using std::string;
void worker(std::promise<string>* p) {
// 약속을 이행하는 모습. 해당 결과는 future 에 들어간다.
p->set_value("some data");
}
int main() {
std::promise<string> p;
// 미래에 string 데이터를 돌려 주겠다는 약속.
std::future<string> data = p.get_future();
std::thread t(worker, &p);
// 미래에 약속된 데이터를 받을 때 까지 기다린다.
data.wait();
// wait 이 리턴했다는 뜻이 future 에 데이터가 준비되었다는 의미.
// 참고로 wait 없이 그냥 get 해도 wait 한 것과 같다.
std::cout << "받은 데이터 : " << data.get() << std::endl;
t.join();
}
코드를 읽어보면 동작이 일목요연하게 보인다. 다만 future 에서 get 을 호출하면, 설정된 객체가 이동된다. 따라서 절대로 get 을 두 번 호출하면 안된다.
정리하면, promise 는 생산자-소비자 패턴에서 마치 생산자 (producer) 의 역할을 수행하고, future 은 소비자 (consumer) 의 역할을 수행한다고 보면 된다. 기존에 mutex 와 condition_variable 을 쓴 것보다 좋은 점은, future 에 예외도 전달할 수 있기 때문이다. 코드를 보자.
#include <exception>
#include <future>
#include <iostream>
#include <string>
#include <thread>
using std::string;
void worker(std::promise<string>* p) {
try {
throw std::runtime_error("Some Error!");
} catch (...) {
// set_exception 에는 exception_ptr 를 전달해야 한다.
p->set_exception(std::current_exception());
}
}
int main() {
std::promise<string> p;
// 미래에 string 데이터를 돌려 주겠다는 약속.
std::future<string> data = p.get_future();
std::thread t(worker, &p);
// 미래에 약속된 데이터를 받을 때 까지 기다린다.
data.wait();
try {
data.get();
} catch (const std::exception& e) {
std::cout << "예외 : " << e.what() << std::endl;
}
t.join();
}
// 출력 결과 :
예외 : Some Error!
set_exception 에 전달된 exception_ptr 는 현재 catch 된 예외에 관한 정보를 반환하는 current_exception 함수가 리턴하는 객체이다.
wait_for
wait 대신 wait_for 를 사용하면 정해진 시간 동안만 기다리고 그냥 진행할 수 있다.
#include <chrono>
#include <exception>
#include <future>
#include <iostream>
#include <string>
#include <thread>
void worker(std::promise<void>* p) {
std::this_thread::sleep_for(std::chrono::seconds(10));
p->set_value();
}
int main() {
// void 의 경우 어떠한 객체도 전달하지 않지만, future 가 set 이 되었냐
// 안되었느냐의 유무로 마치 플래그의 역할을 수행할 수 있습니다.
std::promise<void> p;
// 미래에 string 데이터를 돌려 주겠다는 약속.
std::future<void> data = p.get_future();
std::thread t(worker, &p);
// 미래에 약속된 데이터를 받을 때 까지 기다린다.
while (true) {
std::future_status status = data.wait_for(std::chrono::seconds(1));
// 아직 준비가 안됨
if (status == std::future_status::timeout) {
std::cerr << ">";
}
// promise 가 future 를 설정함.
else if (status == std::future_status::ready) {
break;
}
}
t.join();
}
// 출력 결과 :
>>>>>>>>>
wait_for 함수는 전달된 시간만큼 기다린 후 바로 리턴한다. future_status 는 3 가지 상태를 가질 수 있는데, 먼저 future 에 값이 설정되었을 때 나타나는 future_status::ready 가 있고, wait_for 에 지정한 시간이 지났지만 값이 설정되지 않아서 리턴한 경우에는 future_status::timeout 가 리턴된다.
마지막으로 future_status::deferred 가 있는데, 이는 결과값을 계산하는 함수가 채 실행되지 않았다는 의미이다.
shared_future
future 의 경우 단 한번만 get 을 할 수 있다. 왜냐하면 get 을 호출하면 future 내부의 객체가 이동하기 때문이다. 하지만, 종종 여러 개의 다른 쓰레드에서 future 을 get 할 필요성이 있다.
이 경우 shared_future 을 사용하면 된다. 각 runner 쓰레드들이 달리는 다음 코드를 보자.
#include <chrono>
#include <future>
#include <iostream>
#include <thread>
using std::thread;
void runner(std::shared_future<void> start) {
start.get();
std::cout << "출발!" << std::endl;
}
int main() {
std::promise<void> p;
std::shared_future<void> start = p.get_future();
thread t1(runner, start);
thread t2(runner, start);
thread t3(runner, start);
thread t4(runner, start);
// 참고로 cerr 는 std::cout 과는 다르게 버퍼를 사용하지 않기 때문에 터미널에
// 바로 출력된다.
std::cerr << "준비...";
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cerr << "땅!" << std::endl;
p.set_value();
t1.join();
t2.join();
t3.join();
t4.join();
}
// 출력 결과 :
준비...땅!
출발!
출발!
출발!
출발!
위에서는 start.get( ) 을 통해 shared_future 을 마치 신호를 받아들이는 플래그처럼 사용했다.
packaged_task
C++ 에서는 promise-future 패턴을 비동기적 함수 (정확히는 Callable - 즉 람다 함수, Functor 포함) 의 리턴값에 간단히 적용할 수 있는 packaged_task 라는 것을 지원한다.
packaged_task 에 전달된 함수가 리턴할 때, 그 리턴값을 promise 에 set_value 하고, 만약 예외를 던졌다면 promise 에 set_exception 을 한다. 해당 future 은 packaged_task 가 리턴하는 future 에서 접근할 수 있다. 코드를 보자.
#include <future>
#include <iostream>
#include <thread>
int some_task(int x) { return 10 + x; }
int main() {
// int(int) : int 를 리턴하고 인자로 int 를 받는 함수. (std::function 참조)
std::packaged_task<int(int)> task(some_task);
std::future<int> start = task.get_future();
std::thread t(std::move(task), 5);
std::cout << "결과값 : " << start.get() << std::endl;
t.join();
}
// 출력 결과 :
결과값 : 15
packaged_task 는 비동기적으로 수행할 함수 자체를 생성자의 인자로 받는다. 또한 템플릿 인자로 해당 함수의 타입을 명시해야 한다. packaged_task 는 전달된 함수를 실행해서, 그 함수의 리턴값을 promise 에 설정한다.
생성된 packaged_task 를 쓰레드에 전달하면 된다. 참고로 packaged_task 는 복사 생성이 불가능하므로 (promise 도 마찬가지). 명시적으로 move 해주어야 한다.
이와 같이 packaged_task 를 사용하면 쓰레드에 굳이 promise 를 전달하지 않아도 알아서 packaged_task 가 함수의 리턴값을 처리해주므로 매우 편리하다.
std::async
앞서 promise 나 packaged_task 는 비동기적으로 실행하기 위해서 쓰레드를 명시적으로 실행해야만 했다. 하지만 std::async 에 어떤 함수를 전달하면, 아예 쓰레드를 알아서 만들어 해당 함수를 비동기적으로 실행하고, 그 결과값을 future 에 전달한다.
#include <future>
#include <iostream>
#include <thread>
#include <vector>
// std::accumulate 와 동일
int sum(const std::vector<int>& v, int start, int end) {
int total = 0;
for (int i = start; i < end; ++i) {
total += v[i];
}
return total;
}
int parallel_sum(const std::vector<int>& v) {
// lower_half_future 는 1 ~ 500 까지 비동기적으로 더함
// 참고로 람다 함수를 사용하면 좀 더 깔끔하게 표현할 수 도 있다.
// --> std::async([&v]() { return sum(v, 0, v.size() / 2); });
std::future<int> lower_half_future =
std::async(std::launch::async, sum, cref(v), 0, v.size() / 2);
// upper_half 는 501 부터 1000 까지 더함
int upper_half = sum(v, v.size() / 2, v.size());
return lower_half_future.get() + upper_half;
}
int main() {
std::vector<int> v;
v.reserve(1000);
for (int i = 0; i < 1000; ++i) {
v.push_back(i + 1);
}
std::cout << "1 부터 1000 까지의 합 : " << parallel_sum(v) << std::endl;
}
async 함수는 인자로 받은 함수를 비동기적으로 실행한 후에, 해당 결과값을 보관할 future 을 리턴한다. 첫 번째 인자로는 어떠한 형태로 실행할지를 전달하는데 두 가지 값이 가능하다.
- std::launch::async : 바로 쓰레드를 생성해서 인자로 전달된 함수를 실행한다.
- std::launch::deferred : future 의 get 함수가 호출되었을 때 실행한다. (새로운 쓰레드를 생성하지 않음)
해당 함수를 굳이 바로 당장 비동기적으로 실행할 필요가 없다면 deferred 옵션을 주면 된다.
async 함수는 실행하는 함수의 결과값을 포함하는 future 을 리턴한다.
다음 코드를 통해 비동기적인 실행을 간단히 요약해보자.
#include <future>
#include <iostream>
#include <thread>
int do_work(int x) {
// x 를 가지고 무슨 일을 한다.
std::this_thread::sleep_for(std::chrono::seconds(3));
return x;
}
void do_work_parallel() {
auto f1 = std::async([]() { do_work(3); });
auto f2 = std::async([]() { do_work(3); });
do_work(3);
f1.get();
f2.get();
}
void do_work_sequential() {
do_work(3);
do_work(3);
do_work(3);
}
int main() { do_work_sequential(); }
생각해보기
문제1
async 를 사용해서 기존의 find 를 더 빠르게 수행하는 함수를 만들어 보자.
답 : (10 등분해서 계산. 사실 logN 갯수만큼 쓰레드를 만들면 제일 나을 것 같다)
#include <future>
#include <iostream>
#include <thread>
#include <vector>
#include <algorithm>
using namespace std;
int find_my(std::vector<int>& v, int target, int start, int end)
{
for (int i = start; i < end; ++i)
{
if (v[i] == target)
{
cout << "Found!" << endl;
return i;
}
}
return v.size();
}
int parallel_find(std::vector<int>& v, int target)
{
int minIdx = v.size();
int idx = 0;
for (int i = 0; i < 10; ++i)
{
std::future<int> f = std::async(std::launch::async,
[&v, target, idx]() {return find_my(v, target, idx, min(idx+v.size()/10, v.size()));});
idx += v.size() / 10;
minIdx = std::min(minIdx, f.get());
}
return minIdx;
}
int main()
{
std::vector<int> v(100);
for (int i = 0; i < 100; ++i)
{
v[i] = i+1;
}
std::cout << "Find 99 : " << parallel_find(v, 100) << std::endl;
}
문제 2
쓰레드풀 (ThreadPool) 을 만들어 보자. 쓰레드풀의 사용자는 원하는 만큼의 쓰레드들을 생성해놓고, 무언가 수행하고 싶은 일이 있다면 그냥 쓰레드풀에 추가하면 된다. 한 번 ThreadPool 클래스를 설계하고 만들어보자.
'Tutorials > C++ : Beginner' 카테고리의 다른 글
C++ 기초 개념 16-1 : C++ 유니폼 초기화(Uniform Initialization) (0) | 2022.05.23 |
---|---|
C++ 기초 개념 15-5 : 쓰레드풀(ThreadPool) 만들기 (0) | 2022.04.21 |
C++ 기초 개념 15-3 : atomic 객체와 memory order (0) | 2022.04.19 |
C++ 기초 개념 15-2 : 뮤텍스(Mutex)와 조건 변수(Condition Variable) (0) | 2022.04.19 |
C++ 기초 개념 15-1 : 쓰레드(thread)의 기초와 실습 (2) | 2022.04.19 |