관리 메뉴

KoreanFoodie's Study

C++ 기초 개념 15-5 : 쓰레드풀(ThreadPool) 만들기 본문

Tutorials/C++ : Beginner

C++ 기초 개념 15-5 : 쓰레드풀(ThreadPool) 만들기

머니덕 2022. 4. 21. 14:10

모두의 코드를 참고하여 핵심 내용을 간추리고 있습니다. 자세한 내용은 모두의 코드의 씹어먹는 C++ 강좌를 참고해 주세요!

ThreadPool 간단 요약

ThreadPool 에 우리가 원하는 작업을 추가하면, 쓰레드풀에 있는 쓰레드가 이를 맡아 작업하게 된다. 이 글에서는 쓰레드에 새로운 작업을 추가하는 일을 queue 를 통해 처리한다. 이곳의 구현을 기초로 하여 작성했다. 모든 쓰레드가 작업중이어도 새로운 작업을 추가해도 상관없다!

 

 

클래스 설계하기

먼저 쓰레드들을 보관할 컨테이너가 필요하다. 

// 총 Worker 쓰레드의 개수.
size_t num_threads_;
// Worker 쓰레드를 보관하는 벡터.
std::vector<std::thread> worker_threads_;

편의상, 쓰레드풀에서 돌아가는 쓰레드들을 Worker 쓰레드라고 부르도록 한다. num_threads_ 는 worker_threads_.size( )와 같다.

C++ 에서는 일반적인 타입의 함수 포인터를 저장할 수 있는 컨테이너가 없으므로, 일단은 void 형의 인자를 받지 않는 함수를 전달한다고 가정해 보겠다. (관련 부분은 후에 다루도록 한다)

 

작업을 보관할 컨테이너는 다음과 같다.

// 할일들을 보관하는 job 큐
std::queue<std::function<void()>> jobs_;

해당 큐는 모든 Worker 쓰레드에서 접근 가능하다. 멀티쓰레드 환경에서 발생하는 race condition 을 처리해주기 위해, 락을 만들어 주어야 한다.

 

std::condition_variable cv_job_q;
std::mutex m_job_q_;

cv_job_q_ 와 m_job_q_ 는 생산자-소비자 패턴을 구현할 때 사용된다. 여기서 생산자 역할은 쓰레드풀을 사용하는 사용자들이고 (jobs_ 에 작업을 추가하는 사람들), 소비자들은 Worker 쓰레드들이다.

 

마지막으로 Worker 쓰레드들을 종료시킬 조건을 나타내는 멤버 변수인 

// 모든 쓰레드 종료
bool stop_all;

Worker 쓰레드들은 기본적으로 jobs_ 를 처리하는 동안 무한루프를 도는데, 위 stop_all 이 설정되면 무한 루프를 빠져나가게 된다.

 

 

ThreadPool 의 첫 번째 버전

먼저 코드를 보자.

#include <chrono>
#include <condition_variable>
#include <cstdio>
#include <functional>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>

namespace ThreadPool {

class ThreadPool{
public:
	ThreadPool(size_t num_threads);
	~ThreadPool();

	// job 을 추가한다
	void EnqueueJob(std::function<void()> job);

private:
	// 총 Worker 쓰레드의 개수
	size_t num_threads_;
	// Worker 쓰레드를 보관하는 벡터
	std::vector<std::thread> worker_threads_;
	// 할일들을 보관하는 job 큐
	std::queue<std::function<void()>> jobs_;
	// 위의 job 큐를 위한 cv 와 m
	std::condition_variable cv_job_q_;
	std::mutex m_job_q_;

	// 모든 쓰레드 종료
	bool stop_all;

	// Worker 쓰레드
	void WorkerThread();
};

ThreadPool::ThreadPool(size_t num_threads)
: num_threads_(num_threads), stop_all(false)
{
	worker_threads_.reserve(num_threads_);
	for (size_t i = 0; i < num_threads_; ++i)
	{
		worker_threads_.emplace_back([this]() {this->WorkerThread(); });
	}
}

void ThreadPool::WorkerThread()
{

	while (true)
	{
		std::unique_lock<std::mutex> lock(m_job_q_);
		cv_job_q_.wait(lock, [this]() { return !this->jobs_.empty() || stop_all; });
		if (stop_all && this->jobs_.empty())
		{
			return;
		}
		// 맨 앞의 job 을 뺀다
		std::function<void()> job = std::move(jobs_.front());
		jobs_.pop();
		lock.unlock();

		// 해당 job 을 수행한다 :)
		job();
	}
}

void ThreadPool::EnqueueJob(std::function<void()> job)
{
	if (stop_all)
	{
		throw std::runtime_error("ThreadPool 사용 중지됨");
	}
	{
		std::lock_guard<std::mutex> lock(m_job_q_);
		jobs_.push(std::move(job));
	}
	cv_job_q_.notify_one();
}

ThreadPool::~ThreadPool()
{
	stop_all = true;
	cv_job_q_.notify_all();

	for (auto& t : worker_threads_)
	{
		t.join();
	}
}

}

void work(int t, int id) {
  printf("%d start \n", id);
  std::this_thread::sleep_for(std::chrono::seconds(t));
  printf("%d end after %ds\n", id, t);
}

int main() {
  ThreadPool::ThreadPool pool(3);

  for (int i = 0; i < 10; i++) {
    pool.EnqueueJob([i]() { work(i % 3 + 1, i); });
  }
}

생성자에서는 worker_threads_ 에 WorkerThread 멤버 함수를 인자로 전달해 주고, WorkerThread 함수는 job 큐에 있는 job 을 while loop 에서 받아 실행한다.

소멸자에서는 모든 쓰레드를 깨우고, join 을 시켜주고, EnqueueJob 에서는 큐에 job 을 넣어주고, 쓰레드 하나를 깨워준다. 쓰레드에서 작업을 추가하는 것은 다음과 같이 이루어진다.

pool.EnqueueJob([i]() { work(i % 3 + 1, i); });

앞서 쓰레드풀이 받는 함수의 형태가 리턴 타입이 void 이고 인자를 받지 않는다고 정의했다. 따라서 work 함수를 그대로 전달할 수 는 없다. 왜냐하면 int 타입 인자 두 개를 받기 때문이다. 하지만 위와 같이 void() 형태의 람다 함수로 감싸서 전달하면 문제가 없다!

 

 

임의의 함수 받기

우리가 앞서 만든 ThreadPool 의 job 은 return 하는 값을 제대로 받을 수 없는 문제가 있다. EnqueueJob 함수가 임의의 형태의 함수를 받고, 그 함수의 리턴값을 보관하는 future 를 리턴하는 꼴로 쓰레드풀을 다시 재구성해보자.

먼저 EnqueueJob 을 다음과 같이 바꿀 수 있다.

// job 을 추가한다.
template <class F, class... Args>
std::future<typename std::result_of<F(Args...)>::type> EnqueueJob(F f, Args... args);

// 해석 :
template <class F, class... Args>
std::future</* f 의 리턴 타입*/> EnqueueJob(F f, Args... args);

// 실제 예시 :
int func(int i, int j, int k);
EnequeueJob(func, 1, 2, 3);

 

그런데 임의의 함수와 원소들을 받을 수 있다고 해서, 이를 컨테이너에 추가할 수 있다는 것은 아니다. 어떻게 하면 해당 함수의 실행을 void( ) 꼴의 함수만 저장할 수 있는 컨테이너에 넣을 수 있을까? 바로 packaged_task 를 활용하면 리턴값과 예외를 받아낼 수 있다!

using return_type = typename std::result_iof<F(Args...)>::type;
std::packaged_task<return_type()> job(std::bind(f, args...));

편의상 return_type 이라는 f 의 리턴타입을 보관하는 타입을 정의하였고, 그 밑에 f 의 실행 결과를 저장하는 packaged_task 인 job 객체를 정의했다.

한 가지 중요한 점은 packaged_task 의 생성자는 함수만을 받기 때문에, 실제 job 을 수행하기 위해서는 job(args...) 와 같이 호출하거나, 아니면 위처럼 그 인자들을 f 에 bind 시켜주면 된다.

 

EnequeueJob 의 아래부분은 다음과 같이 수정하면 된다.

std::future<return_type> job_result_future = job.get_future();
{
    std::lock_guard<std::mutex> lock(m_job_q_);
    jobs_.push([&job]() { job(); });
}

job 의 실행 결과를 보관하는 job_result_future 을 정의하고, 마지막으로 jobs_ 에 job 을 실행하는 람다 함수를 추가하였다. job 이 실행되면, f 의 리턴값이 job_result_future 에 들어가게 되고, 이는 쓰레드풀 사용자가 접근할 수 있게 된다.

 

수정된 ThreadPool 의 구현은 다음과 같다 :

더보기
#include <chrono>
#include <condition_variable>
#include <cstdio>
#include <functional>
#include <future>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>

namespace ThreadPool {
class ThreadPool {
 public:
  ThreadPool(size_t num_threads);
  ~ThreadPool();

  // job 을 추가한다.
  template <class F, class... Args>
  std::future<typename std::result_of<F(Args...)>::type> EnqueueJob(
    F f, Args... args);

 private:
  // 총 Worker 쓰레드의 개수.
  size_t num_threads_;
  // Worker 쓰레드를 보관하는 벡터.
  std::vector<std::thread> worker_threads_;
  // 할일들을 보관하는 job 큐.
  std::queue<std::function<void()>> jobs_;
  // 위의 job 큐를 위한 cv 와 m.
  std::condition_variable cv_job_q_;
  std::mutex m_job_q_;

  // 모든 쓰레드 종료
  bool stop_all;

  // Worker 쓰레드
  void WorkerThread();
};

ThreadPool::ThreadPool(size_t num_threads)
    : num_threads_(num_threads), stop_all(false) {
  worker_threads_.reserve(num_threads_);
  for (size_t i = 0; i < num_threads_; ++i) {
    worker_threads_.emplace_back([this]() { this->WorkerThread(); });
  }
}

void ThreadPool::WorkerThread() {
  while (true) {
    std::unique_lock<std::mutex> lock(m_job_q_);
    cv_job_q_.wait(lock, [this]() { return !this->jobs_.empty() || stop_all; });
    if (stop_all && this->jobs_.empty()) {
      return;
    }

    // 맨 앞의 job 을 뺀다.
    std::function<void()> job = std::move(jobs_.front());
    jobs_.pop();
    lock.unlock();

    // 해당 job 을 수행한다 :)
    job();
  }
}

ThreadPool::~ThreadPool() {
  stop_all = true;
  cv_job_q_.notify_all();

  for (auto& t : worker_threads_) {
    t.join();
  }
}

template <class F, class... Args>
std::future<typename std::result_of<F(Args...)>::type> ThreadPool::EnqueueJob(
  F f, Args... args) {
  if (stop_all) {
    throw std::runtime_error("ThreadPool 사용 중지됨");
  }

  using return_type = typename std::result_of<F(Args...)>::type;
  std::packaged_task<return_type()> job(std::bind(f, args...));

  std::future<return_type> job_result_future = job.get_future();
  {
    std::lock_guard<std::mutex> lock(m_job_q_);
    jobs_.push([&job]() { job(); });
  }
  cv_job_q_.notify_one();

  return job_result_future;
}

}  // namespace ThreadPool

int work(int t, int id) {
  printf("%d start \n", id);
  std::this_thread::sleep_for(std::chrono::seconds(t));
  printf("%d end after %ds\n", id, t);
  return t + id;
}

int main() {
  ThreadPool::ThreadPool pool(3);

  std::vector<std::future<int>> futures;
  for (int i = 0; i < 10; i++) {
    futures.emplace_back(pool.EnqueueJob(work, i % 3 + 1, i));
  }
  for (auto& f : futures) {
    printf("result : %d \n", f.get());
  }
}

위의 코드를 실행하면 Broken promise 예외가 던져지며 런타임 오류가 발생한다. Broken promise 예외는 promise 에 set_value 를 하기 전에 이미 promise 의 future 객체가 파괴되었다면 발생하는 예외이다. 그렇다면 돼 future 객체가 파괴되었을까?

 

std::packaged_task<return_type()> job(std::bind(f, args...));

EnqueueJob 함수에 정의된 job 객체는 지역 변수이다. 즉, EnqueueJob 함수가 리턴하면 파괴되는 객체이다. 따라서 [&job]() { job(); } 안에서 job 을 접근할 때 이미 그 객체는 파괴되고 없어져 있을 것이다.

이 문제를 해결하는 방법으로 크게 두 가지를 생각해볼 수 있다.

  1. packaged_task 를 따로 컨테이너에 저장해서 보관한다.
  2. shared_ptr 에 packaged_task 를 보관한다.

 

(1) 방식의 경우, packaged_task 를 사용하지 않을 때에도 컨테이너에 남아있다는 문제가 있다. 하지만 (2) 번 방식의 경우 packaged_task 를 사용하는 것이 없을 때 알아서 shared_ptr 가 객체를 소멸시켜주므로 훨씬 관리하기가 편하다!

auto job =
    std::make_shared<std::packaged_task<return_type()>>(std::bind(f, args...));
std::future<return_type> job_result_future = job->get_future();
{
    std::lock_guard<std::mutex> lock(m_job_q_);
    jobs_.push([job]() { (*job)(); };
}

위와 같이 간단히 make_shared 를 통해 shared_ptr 를 생성하였고, 람다 함수에 shared_ptr 의 복사본을 전달해서 람다 함수 안에서도 packaged_task 의 shared_ptr 하나를 붙들고 있게 되었다. 따라서 job 을 실행하는 시점에서도 packaged_task 객체는 계속 살아있게 된다!

 

수정된 코드는 다음과 같다 :

더보기
#include <chrono>
#include <condition_variable>
#include <cstdio>
#include <functional>
#include <future>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>

namespace ThreadPool {
class ThreadPool {
 public:
  ThreadPool(size_t num_threads);
  ~ThreadPool();

  // job 을 추가한다.
  template <class F, class... Args>
  std::future<typename std::result_of<F(Args...)>::type> EnqueueJob(
    F f, Args... args);

 private:
  // 총 Worker 쓰레드의 개수.
  size_t num_threads_;
  // Worker 쓰레드를 보관하는 벡터.
  std::vector<std::thread> worker_threads_;
  // 할일들을 보관하는 job 큐.
  std::queue<std::function<void()>> jobs_;
  // 위의 job 큐를 위한 cv 와 m.
  std::condition_variable cv_job_q_;
  std::mutex m_job_q_;

  // 모든 쓰레드 종료
  bool stop_all;

  // Worker 쓰레드
  void WorkerThread();
};

ThreadPool::ThreadPool(size_t num_threads)
    : num_threads_(num_threads), stop_all(false) {
  worker_threads_.reserve(num_threads_);
  for (size_t i = 0; i < num_threads_; ++i) {
    worker_threads_.emplace_back([this]() { this->WorkerThread(); });
  }
}

void ThreadPool::WorkerThread() {
  while (true) {
    std::unique_lock<std::mutex> lock(m_job_q_);
    cv_job_q_.wait(lock, [this]() { return !this->jobs_.empty() || stop_all; });
    if (stop_all && this->jobs_.empty()) {
      return;
    }

    // 맨 앞의 job 을 뺀다.
    std::function<void()> job = std::move(jobs_.front());
    jobs_.pop();
    lock.unlock();

    // 해당 job 을 수행한다 :)
    job();
  }
}

ThreadPool::~ThreadPool() {
  stop_all = true;
  cv_job_q_.notify_all();

  for (auto& t : worker_threads_) {
    t.join();
  }
}

template <class F, class... Args>
std::future<typename std::result_of<F(Args...)>::type> ThreadPool::EnqueueJob(
  F f, Args... args) {
  if (stop_all) {
    throw std::runtime_error("ThreadPool 사용 중지됨");
  }

  using return_type = typename std::result_of<F(Args...)>::type;
  auto job =
    std::make_shared<std::packaged_task<return_type()>>(std::bind(f, args...));
  std::future<return_type> job_result_future = job->get_future();
  {
    std::lock_guard<std::mutex> lock(m_job_q_);
    jobs_.push([job]() { (*job)(); });
  }
  cv_job_q_.notify_one();

  return job_result_future;
}

}  // namespace ThreadPool

int work(int t, int id) {
  printf("%d start \n", id);
  std::this_thread::sleep_for(std::chrono::seconds(t));
  printf("%d end after %ds\n", id, t);
  return t + id;
}

int main() {
  ThreadPool::ThreadPool pool(3);

  std::vector<std::future<int>> futures;
  for (int i = 0; i < 10; i++) {
    futures.emplace_back(pool.EnqueueJob(work, i % 3 + 1, i));
  }
  for (auto& f : futures) {
    printf("result : %d \n", f.get());
  }
}

 

 

완벽한 전달 

ThreadPool::EnqueueJob(F f, Args... args);

위의 EnqueueJob 함수의 경우, 인자들의 복사본을 받는다. 하지만 이는 불필요한 복사를 야기하므로, 완벽한 전달 패턴을 사용하는 것이 좋다.

 

EnqueueJob 함수의 인자들을 우측값 레퍼런스로 바꾼 뒤에, bind 함수에 forward 로 인자를 전달해주면 된다!

template <class F, class... Args>
std::future<typename std::result_of<F(Args...)>::type> EnqueueJob(F&& f, Args&&... args);

auto job = std::make_shared<std::packaged_task<return_type()>>(
    std::bind(std::forward<F>(f), std::forward<Args>(args)...));

이제 불필요한 복사 없이 인자가 완벽하게 전달된다!

 

최종 소스는 다음과 같다 :

더보기
#include <chrono>
#include <condition_variable>
#include <cstdio>
#include <functional>
#include <future>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>

namespace ThreadPool {
class ThreadPool {
 public:
  ThreadPool(size_t num_threads);
  ~ThreadPool();

  // job 을 추가한다.
  template <class F, class... Args>
  std::future<typename std::result_of<F(Args...)>::type> EnqueueJob(
    F&& f, Args&&... args);

 private:
  // 총 Worker 쓰레드의 개수.
  size_t num_threads_;
  // Worker 쓰레드를 보관하는 벡터.
  std::vector<std::thread> worker_threads_;
  // 할일들을 보관하는 job 큐.
  std::queue<std::function<void()>> jobs_;
  // 위의 job 큐를 위한 cv 와 m.
  std::condition_variable cv_job_q_;
  std::mutex m_job_q_;

  // 모든 쓰레드 종료
  bool stop_all;

  // Worker 쓰레드
  void WorkerThread();
};

ThreadPool::ThreadPool(size_t num_threads)
    : num_threads_(num_threads), stop_all(false) {
  worker_threads_.reserve(num_threads_);
  for (size_t i = 0; i < num_threads_; ++i) {
    worker_threads_.emplace_back([this]() { this->WorkerThread(); });
  }
}

void ThreadPool::WorkerThread() {
  while (true) {
    std::unique_lock<std::mutex> lock(m_job_q_);
    cv_job_q_.wait(lock, [this]() { return !this->jobs_.empty() || stop_all; });
    if (stop_all && this->jobs_.empty()) {
      return;
    }

    // 맨 앞의 job 을 뺀다.
    std::function<void()> job = std::move(jobs_.front());
    jobs_.pop();
    lock.unlock();

    // 해당 job 을 수행한다 :)
    job();
  }
}

ThreadPool::~ThreadPool() {
  stop_all = true;
  cv_job_q_.notify_all();

  for (auto& t : worker_threads_) {
    t.join();
  }
}

template <class F, class... Args>
std::future<typename std::result_of<F(Args...)>::type> ThreadPool::EnqueueJob(
  F&& f, Args&&... args) {
  if (stop_all) {
    throw std::runtime_error("ThreadPool 사용 중지됨");
  }

  using return_type = typename std::result_of<F(Args...)>::type;
  auto job = std::make_shared<std::packaged_task<return_type()>>(
    std::bind(std::forward<F>(f), std::forward<Args>(args)...));
  std::future<return_type> job_result_future = job->get_future();
  {
    std::lock_guard<std::mutex> lock(m_job_q_);
    jobs_.push([job]() { (*job)(); });
  }
  cv_job_q_.notify_one();

  return job_result_future;
}

}  // namespace ThreadPool

// 사용 예시
int work(int t, int id) {
  printf("%d start \n", id);
  std::this_thread::sleep_for(std::chrono::seconds(t));
  printf("%d end after %ds\n", id, t);
  return t + id;
}

int main() {
  ThreadPool::ThreadPool pool(3);

  std::vector<std::future<int>> futures;
  for (int i = 0; i < 10; i++) {
    futures.emplace_back(pool.EnqueueJob(work, i % 3 + 1, i));
  }
  for (auto& f : futures) {
    printf("result : %d \n", f.get());
  }
}
0 Comments
댓글쓰기 폼