KoreanFoodie's Study

C++ 기초 개념 15-4 : future, promise, packaged_task, async 본문

Tutorials/C++ : Beginner

C++ 기초 개념 15-4 : future, promise, packaged_task, async

GoldGiver 2022. 4. 20. 14:19

모두의 코드를 참고하여 핵심 내용을 간추리고 있습니다. 자세한 내용은 모두의 코드의 씹어먹는 C++ 강좌를 참고해 주세요!

동기와 비동기

C++ 코드는 순차적으로 실행된다. 예를 들어, 파일을 읽고, 읽은 파일로 어떤 작업을 하고, 마지막으로 파일과 관계없는 작업을 수행하는 코드가 있다고 가정해 보자. 이 경우, 파일과 관계없는 작업을 하기 위해 파일을 읽기까지 기다려야 하는 지연 시간이 존재한다. 이전에 배운 쓰레드를 이용하면 이 작업을 비동기적으로 바꾸어 줄 수도 있다.

void file_read(string* result)
{
    string txt = read("a.txt");
    *result = do_something_with_txt(txt);
}

int main()
{
    string result;
    thread t(file_read, &result);
    do_other_computation();
    
    t.join();
}

위에서 file_read 함수를 쓰레드에게 맡기면, do_other_computation 을 비동기적으로 실행시킬 수 있다!

C++11 표준 라이브러리는 위와 같은 비동기적 실행을 간단하게 만들어주는 도구를 제공하고 있다.

 

 

std::promise 와 std::future

사실 비동기적 실행이라는 것은, 특정 일을 어떤 쓰레드에게 떠넘긴다는 뜻이다. 이는 어떤 쓰레드 T 를 이용해서, 미래에 (future) 쓰레드 T 가 원하는 데이터를 돌려주겠다고 약속 (promise) 한다고 볼 수 있다! 코드를 보자.

#include <future>
#include <iostream>
#include <string>
#include <thread>
using std::string;

void worker(std::promise<string>* p) {
  // 약속을 이행하는 모습. 해당 결과는 future 에 들어간다.
  p->set_value("some data");
}
int main() {
  std::promise<string> p;

  // 미래에 string 데이터를 돌려 주겠다는 약속.
  std::future<string> data = p.get_future();

  std::thread t(worker, &p);

  // 미래에 약속된 데이터를 받을 때 까지 기다린다.
  data.wait();

  // wait 이 리턴했다는 뜻이 future 에 데이터가 준비되었다는 의미.
  // 참고로 wait 없이 그냥 get 해도 wait 한 것과 같다.
  std::cout << "받은 데이터 : " << data.get() << std::endl;

  t.join();
}

코드를 읽어보면 동작이 일목요연하게 보인다. 다만 future 에서 get 을 호출하면, 설정된 객체가 이동된다. 따라서 절대로 get 을 두 번 호출하면 안된다.

 

정리하면, promise 는 생산자-소비자 패턴에서 마치 생산자 (producer) 의 역할을 수행하고, future 은 소비자 (consumer) 의 역할을 수행한다고 보면 된다. 기존에 mutex 와 condition_variable 을 쓴 것보다 좋은 점은, future 에 예외도 전달할 수 있기 때문이다. 코드를 보자.

#include <exception>
#include <future>
#include <iostream>
#include <string>
#include <thread>
using std::string;

void worker(std::promise<string>* p) {
  try {
    throw std::runtime_error("Some Error!");
  } catch (...) {
    // set_exception 에는 exception_ptr 를 전달해야 한다.
    p->set_exception(std::current_exception());
  }
}
int main() {
  std::promise<string> p;

  // 미래에 string 데이터를 돌려 주겠다는 약속.
  std::future<string> data = p.get_future();

  std::thread t(worker, &p);

  // 미래에 약속된 데이터를 받을 때 까지 기다린다.
  data.wait();

  try {
    data.get();
  } catch (const std::exception& e) {
    std::cout << "예외 : " << e.what() << std::endl;
  }
  t.join();
}

// 출력 결과 :
예외 : Some Error!

 

 

set_exception 에 전달된 exception_ptr 는 현재 catch 된 예외에 관한 정보를 반환하는 current_exception 함수가 리턴하는 객체이다.

 

wait_for

wait 대신 wait_for 를 사용하면 정해진 시간 동안만 기다리고 그냥 진행할 수 있다.

#include <chrono>
#include <exception>
#include <future>
#include <iostream>
#include <string>
#include <thread>

void worker(std::promise<void>* p) {
  std::this_thread::sleep_for(std::chrono::seconds(10));
  p->set_value();
}
int main() {
  // void 의 경우 어떠한 객체도 전달하지 않지만, future 가 set 이 되었냐
  // 안되었느냐의 유무로 마치 플래그의 역할을 수행할 수 있습니다.
  std::promise<void> p;

  // 미래에 string 데이터를 돌려 주겠다는 약속.
  std::future<void> data = p.get_future();

  std::thread t(worker, &p);

  // 미래에 약속된 데이터를 받을 때 까지 기다린다.
  while (true) {
    std::future_status status = data.wait_for(std::chrono::seconds(1));

    // 아직 준비가 안됨
    if (status == std::future_status::timeout) {
      std::cerr << ">";
    }
    // promise 가 future 를 설정함.
    else if (status == std::future_status::ready) {
      break;
    }
  }
  t.join();
}

// 출력 결과 : 
>>>>>>>>>

 

wait_for 함수는 전달된 시간만큼 기다린 후 바로 리턴한다. future_status 는 3 가지 상태를 가질 수 있는데, 먼저 future 에 값이 설정되었을 때 나타나는 future_status::ready 가 있고, wait_for 에 지정한 시간이 지났지만 값이 설정되지 않아서 리턴한 경우에는 future_status::timeout 가 리턴된다.

마지막으로 future_status::deferred 가 있는데, 이는 결과값을 계산하는 함수가 채 실행되지 않았다는 의미이다.

 

 

shared_future

future 의 경우 단 한번만 get 을 할 수 있다. 왜냐하면 get 을 호출하면 future 내부의 객체가 이동하기 때문이다. 하지만, 종종 여러 개의 다른 쓰레드에서 future 을 get 할 필요성이 있다.

이 경우 shared_future 을 사용하면 된다. 각 runner 쓰레드들이 달리는 다음 코드를 보자.

#include <chrono>
#include <future>
#include <iostream>
#include <thread>
using std::thread;

void runner(std::shared_future<void> start) {
  start.get();
  std::cout << "출발!" << std::endl;
}

int main() {
  std::promise<void> p;
  std::shared_future<void> start = p.get_future();

  thread t1(runner, start);
  thread t2(runner, start);
  thread t3(runner, start);
  thread t4(runner, start);

  // 참고로 cerr 는 std::cout 과는 다르게 버퍼를 사용하지 않기 때문에 터미널에
  // 바로 출력된다.
  std::cerr << "준비...";
  std::this_thread::sleep_for(std::chrono::seconds(1));
  std::cerr << "땅!" << std::endl;

  p.set_value();

  t1.join();
  t2.join();
  t3.join();
  t4.join();
}

// 출력 결과 : 
준비...땅!
출발!
출발!
출발!
출발!

위에서는 start.get( ) 을 통해 shared_future 을 마치 신호를 받아들이는 플래그처럼 사용했다.

 

 

packaged_task

C++ 에서는 promise-future 패턴을 비동기적 함수 (정확히는 Callable - 즉 람다 함수, Functor 포함) 의 리턴값에 간단히 적용할 수 있는 packaged_task 라는 것을 지원한다.

 

packaged_task 에 전달된 함수가 리턴할 때, 그 리턴값을 promise 에 set_value 하고, 만약 예외를 던졌다면 promise 에 set_exception 을 한다. 해당 future 은 packaged_task 가 리턴하는 future 에서 접근할 수 있다. 코드를 보자.

#include <future>
#include <iostream>
#include <thread>

int some_task(int x) { return 10 + x; }

int main() {
  // int(int) : int 를 리턴하고 인자로 int 를 받는 함수. (std::function 참조)
  std::packaged_task<int(int)> task(some_task);

  std::future<int> start = task.get_future();

  std::thread t(std::move(task), 5);

  std::cout << "결과값 : " << start.get() << std::endl;
  t.join();
}

// 출력 결과 : 
결과값 : 15

packaged_task 는 비동기적으로 수행할 함수 자체를 생성자의 인자로 받는다. 또한 템플릿 인자로 해당 함수의 타입을 명시해야 한다.  packaged_task 는 전달된 함수를 실행해서, 그 함수의 리턴값을 promise 에 설정한다. 

생성된 packaged_task 를 쓰레드에 전달하면 된다. 참고로 packaged_task 는 복사 생성이 불가능하므로 (promise 도 마찬가지). 명시적으로 move 해주어야 한다.

이와 같이 packaged_task 를 사용하면 쓰레드에 굳이 promise 를 전달하지 않아도 알아서 packaged_task 가 함수의 리턴값을 처리해주므로 매우 편리하다.

 

 

std::async

앞서 promise 나 packaged_task 는 비동기적으로 실행하기 위해서 쓰레드를 명시적으로 실행해야만 했다. 하지만 std::async 에 어떤 함수를 전달하면, 아예 쓰레드를 알아서 만들어 해당 함수를 비동기적으로 실행하고, 그 결과값을 future 에 전달한다.

#include <future>
#include <iostream>
#include <thread>
#include <vector>

// std::accumulate 와 동일
int sum(const std::vector<int>& v, int start, int end) {
  int total = 0;
  for (int i = start; i < end; ++i) {
    total += v[i];
  }
  return total;
}

int parallel_sum(const std::vector<int>& v) {
  // lower_half_future 는 1 ~ 500 까지 비동기적으로 더함
  // 참고로 람다 함수를 사용하면 좀 더 깔끔하게 표현할 수 도 있다.
  // --> std::async([&v]() { return sum(v, 0, v.size() / 2); });
  std::future<int> lower_half_future =
    std::async(std::launch::async, sum, cref(v), 0, v.size() / 2);

  // upper_half 는 501 부터 1000 까지 더함
  int upper_half = sum(v, v.size() / 2, v.size());

  return lower_half_future.get() + upper_half;
}

int main() {
  std::vector<int> v;
  v.reserve(1000);

  for (int i = 0; i < 1000; ++i) {
    v.push_back(i + 1);
  }

  std::cout << "1 부터 1000 까지의 합 : " << parallel_sum(v) << std::endl;
}

async 함수는 인자로 받은 함수를 비동기적으로 실행한 후에, 해당 결과값을 보관할 future 을 리턴한다. 첫 번째 인자로는 어떠한 형태로 실행할지를 전달하는데 두 가지 값이 가능하다.

  • std::launch::async : 바로 쓰레드를 생성해서 인자로 전달된 함수를 실행한다.
  • std::launch::deferred : future 의 get 함수가 호출되었을 때 실행한다. (새로운 쓰레드를 생성하지 않음)

해당 함수를 굳이 바로 당장 비동기적으로 실행할 필요가 없다면 deferred 옵션을 주면 된다.

async 함수는 실행하는 함수의 결과값을 포함하는 future 을 리턴한다.

 

다음 코드를 통해 비동기적인 실행을 간단히 요약해보자.

#include <future>
#include <iostream>
#include <thread>

int do_work(int x) {
  // x 를 가지고 무슨 일을 한다.
  std::this_thread::sleep_for(std::chrono::seconds(3));
  return x;
}

void do_work_parallel() {
  auto f1 = std::async([]() { do_work(3); });
  auto f2 = std::async([]() { do_work(3); });
  do_work(3);

  f1.get();
  f2.get();
}

void do_work_sequential() {
  do_work(3);
  do_work(3);
  do_work(3);
}

int main() { do_work_sequential(); }

 

 

생각해보기

 

문제1

async 를 사용해서 기존의 find 를 더 빠르게 수행하는 함수를 만들어 보자.

 

: (10 등분해서 계산. 사실 logN 갯수만큼 쓰레드를 만들면 제일 나을 것 같다)

#include <future>
#include <iostream>
#include <thread>
#include <vector>
#include <algorithm>

using namespace std;

int find_my(std::vector<int>& v, int target, int start, int end)
{
  for (int i = start; i < end; ++i)
  {
    if (v[i] == target)
    {
      cout << "Found!" << endl;
      return i;
    }
  }
  return v.size();
}

int parallel_find(std::vector<int>& v, int target)
{
  int minIdx = v.size();
  int idx = 0;
  for (int i = 0; i < 10; ++i)
  {
    std::future<int> f = std::async(std::launch::async, 
      [&v, target, idx]() {return find_my(v, target, idx, min(idx+v.size()/10, v.size()));});
    idx += v.size() / 10;   
    minIdx = std::min(minIdx, f.get());
  }

  return minIdx;
}

int main() 
{
  std::vector<int> v(100);

  for (int i = 0; i < 100; ++i)
  {
    v[i] = i+1;
  }

  std::cout << "Find 99 : " << parallel_find(v, 100) << std::endl;
}

 

문제 2

쓰레드풀 (ThreadPool) 을 만들어 보자. 쓰레드풀의 사용자는 원하는 만큼의 쓰레드들을 생성해놓고, 무언가 수행하고 싶은 일이 있다면 그냥 쓰레드풀에 추가하면 된다. 한 번 ThreadPool 클래스를 설계하고 만들어보자.

 

 

 

 

 

 
Comments