KoreanFoodie's Study
C++ 기초 개념 15-5 : 쓰레드풀(ThreadPool) 만들기 본문
모두의 코드를 참고하여 핵심 내용을 간추리고 있습니다. 자세한 내용은 모두의 코드의 씹어먹는 C++ 강좌를 참고해 주세요!
ThreadPool 간단 요약
ThreadPool 에 우리가 원하는 작업을 추가하면, 쓰레드풀에 있는 쓰레드가 이를 맡아 작업하게 된다. 이 글에서는 쓰레드에 새로운 작업을 추가하는 일을 queue 를 통해 처리한다. 이곳의 구현을 기초로 하여 작성했다. 모든 쓰레드가 작업중이어도 새로운 작업을 추가해도 상관없다!
클래스 설계하기
먼저 쓰레드들을 보관할 컨테이너가 필요하다.
// 총 Worker 쓰레드의 개수.
size_t num_threads_;
// Worker 쓰레드를 보관하는 벡터.
std::vector<std::thread> worker_threads_;
편의상, 쓰레드풀에서 돌아가는 쓰레드들을 Worker 쓰레드라고 부르도록 한다. num_threads_ 는 worker_threads_.size( )와 같다.
C++ 에서는 일반적인 타입의 함수 포인터를 저장할 수 있는 컨테이너가 없으므로, 일단은 void 형의 인자를 받지 않는 함수를 전달한다고 가정해 보겠다. (관련 부분은 후에 다루도록 한다)
작업을 보관할 컨테이너는 다음과 같다.
// 할일들을 보관하는 job 큐
std::queue<std::function<void()>> jobs_;
해당 큐는 모든 Worker 쓰레드에서 접근 가능하다. 멀티쓰레드 환경에서 발생하는 race condition 을 처리해주기 위해, 락을 만들어 주어야 한다.
std::condition_variable cv_job_q;
std::mutex m_job_q_;
cv_job_q_ 와 m_job_q_ 는 생산자-소비자 패턴을 구현할 때 사용된다. 여기서 생산자 역할은 쓰레드풀을 사용하는 사용자들이고 (jobs_ 에 작업을 추가하는 사람들), 소비자들은 Worker 쓰레드들이다.
마지막으로 Worker 쓰레드들을 종료시킬 조건을 나타내는 멤버 변수인
// 모든 쓰레드 종료
bool stop_all;
Worker 쓰레드들은 기본적으로 jobs_ 를 처리하는 동안 무한루프를 도는데, 위 stop_all 이 설정되면 무한 루프를 빠져나가게 된다.
ThreadPool 의 첫 번째 버전
먼저 코드를 보자.
#include <chrono>
#include <condition_variable>
#include <cstdio>
#include <functional>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>
namespace ThreadPool {
class ThreadPool{
public:
ThreadPool(size_t num_threads);
~ThreadPool();
// job 을 추가한다
void EnqueueJob(std::function<void()> job);
private:
// 총 Worker 쓰레드의 개수
size_t num_threads_;
// Worker 쓰레드를 보관하는 벡터
std::vector<std::thread> worker_threads_;
// 할일들을 보관하는 job 큐
std::queue<std::function<void()>> jobs_;
// 위의 job 큐를 위한 cv 와 m
std::condition_variable cv_job_q_;
std::mutex m_job_q_;
// 모든 쓰레드 종료
bool stop_all;
// Worker 쓰레드
void WorkerThread();
};
ThreadPool::ThreadPool(size_t num_threads)
: num_threads_(num_threads), stop_all(false)
{
worker_threads_.reserve(num_threads_);
for (size_t i = 0; i < num_threads_; ++i)
{
worker_threads_.emplace_back([this]() {this->WorkerThread(); });
}
}
void ThreadPool::WorkerThread()
{
while (true)
{
std::unique_lock<std::mutex> lock(m_job_q_);
cv_job_q_.wait(lock, [this]() { return !this->jobs_.empty() || stop_all; });
if (stop_all && this->jobs_.empty())
{
return;
}
// 맨 앞의 job 을 뺀다
std::function<void()> job = std::move(jobs_.front());
jobs_.pop();
lock.unlock();
// 해당 job 을 수행한다 :)
job();
}
}
void ThreadPool::EnqueueJob(std::function<void()> job)
{
if (stop_all)
{
throw std::runtime_error("ThreadPool 사용 중지됨");
}
{
std::lock_guard<std::mutex> lock(m_job_q_);
jobs_.push(std::move(job));
}
cv_job_q_.notify_one();
}
ThreadPool::~ThreadPool()
{
stop_all = true;
cv_job_q_.notify_all();
for (auto& t : worker_threads_)
{
t.join();
}
}
}
void work(int t, int id) {
printf("%d start \n", id);
std::this_thread::sleep_for(std::chrono::seconds(t));
printf("%d end after %ds\n", id, t);
}
int main() {
ThreadPool::ThreadPool pool(3);
for (int i = 0; i < 10; i++) {
pool.EnqueueJob([i]() { work(i % 3 + 1, i); });
}
}
생성자에서는 worker_threads_ 에 WorkerThread 멤버 함수를 인자로 전달해 주고, WorkerThread 함수는 job 큐에 있는 job 을 while loop 에서 받아 실행한다.
소멸자에서는 모든 쓰레드를 깨우고, join 을 시켜주고, EnqueueJob 에서는 큐에 job 을 넣어주고, 쓰레드 하나를 깨워준다. 쓰레드에서 작업을 추가하는 것은 다음과 같이 이루어진다.
pool.EnqueueJob([i]() { work(i % 3 + 1, i); });
앞서 쓰레드풀이 받는 함수의 형태가 리턴 타입이 void 이고 인자를 받지 않는다고 정의했다. 따라서 work 함수를 그대로 전달할 수 는 없다. 왜냐하면 int 타입 인자 두 개를 받기 때문이다. 하지만 위와 같이 void() 형태의 람다 함수로 감싸서 전달하면 문제가 없다!
임의의 함수 받기
우리가 앞서 만든 ThreadPool 의 job 은 return 하는 값을 제대로 받을 수 없는 문제가 있다. EnqueueJob 함수가 임의의 형태의 함수를 받고, 그 함수의 리턴값을 보관하는 future 를 리턴하는 꼴로 쓰레드풀을 다시 재구성해보자.
먼저 EnqueueJob 을 다음과 같이 바꿀 수 있다.
// job 을 추가한다.
template <class F, class... Args>
std::future<typename std::result_of<F(Args...)>::type> EnqueueJob(F f, Args... args);
// 해석 :
template <class F, class... Args>
std::future</* f 의 리턴 타입*/> EnqueueJob(F f, Args... args);
// 실제 예시 :
int func(int i, int j, int k);
EnequeueJob(func, 1, 2, 3);
그런데 임의의 함수와 원소들을 받을 수 있다고 해서, 이를 컨테이너에 추가할 수 있다는 것은 아니다. 어떻게 하면 해당 함수의 실행을 void( ) 꼴의 함수만 저장할 수 있는 컨테이너에 넣을 수 있을까? 바로 packaged_task 를 활용하면 리턴값과 예외를 받아낼 수 있다!
using return_type = typename std::result_iof<F(Args...)>::type;
std::packaged_task<return_type()> job(std::bind(f, args...));
편의상 return_type 이라는 f 의 리턴타입을 보관하는 타입을 정의하였고, 그 밑에 f 의 실행 결과를 저장하는 packaged_task 인 job 객체를 정의했다.
한 가지 중요한 점은 packaged_task 의 생성자는 함수만을 받기 때문에, 실제 job 을 수행하기 위해서는 job(args...) 와 같이 호출하거나, 아니면 위처럼 그 인자들을 f 에 bind 시켜주면 된다.
EnequeueJob 의 아래부분은 다음과 같이 수정하면 된다.
std::future<return_type> job_result_future = job.get_future();
{
std::lock_guard<std::mutex> lock(m_job_q_);
jobs_.push([&job]() { job(); });
}
job 의 실행 결과를 보관하는 job_result_future 을 정의하고, 마지막으로 jobs_ 에 job 을 실행하는 람다 함수를 추가하였다. job 이 실행되면, f 의 리턴값이 job_result_future 에 들어가게 되고, 이는 쓰레드풀 사용자가 접근할 수 있게 된다.
수정된 ThreadPool 의 구현은 다음과 같다 :
#include <chrono>
#include <condition_variable>
#include <cstdio>
#include <functional>
#include <future>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>
namespace ThreadPool {
class ThreadPool {
public:
ThreadPool(size_t num_threads);
~ThreadPool();
// job 을 추가한다.
template <class F, class... Args>
std::future<typename std::result_of<F(Args...)>::type> EnqueueJob(
F f, Args... args);
private:
// 총 Worker 쓰레드의 개수.
size_t num_threads_;
// Worker 쓰레드를 보관하는 벡터.
std::vector<std::thread> worker_threads_;
// 할일들을 보관하는 job 큐.
std::queue<std::function<void()>> jobs_;
// 위의 job 큐를 위한 cv 와 m.
std::condition_variable cv_job_q_;
std::mutex m_job_q_;
// 모든 쓰레드 종료
bool stop_all;
// Worker 쓰레드
void WorkerThread();
};
ThreadPool::ThreadPool(size_t num_threads)
: num_threads_(num_threads), stop_all(false) {
worker_threads_.reserve(num_threads_);
for (size_t i = 0; i < num_threads_; ++i) {
worker_threads_.emplace_back([this]() { this->WorkerThread(); });
}
}
void ThreadPool::WorkerThread() {
while (true) {
std::unique_lock<std::mutex> lock(m_job_q_);
cv_job_q_.wait(lock, [this]() { return !this->jobs_.empty() || stop_all; });
if (stop_all && this->jobs_.empty()) {
return;
}
// 맨 앞의 job 을 뺀다.
std::function<void()> job = std::move(jobs_.front());
jobs_.pop();
lock.unlock();
// 해당 job 을 수행한다 :)
job();
}
}
ThreadPool::~ThreadPool() {
stop_all = true;
cv_job_q_.notify_all();
for (auto& t : worker_threads_) {
t.join();
}
}
template <class F, class... Args>
std::future<typename std::result_of<F(Args...)>::type> ThreadPool::EnqueueJob(
F f, Args... args) {
if (stop_all) {
throw std::runtime_error("ThreadPool 사용 중지됨");
}
using return_type = typename std::result_of<F(Args...)>::type;
std::packaged_task<return_type()> job(std::bind(f, args...));
std::future<return_type> job_result_future = job.get_future();
{
std::lock_guard<std::mutex> lock(m_job_q_);
jobs_.push([&job]() { job(); });
}
cv_job_q_.notify_one();
return job_result_future;
}
} // namespace ThreadPool
int work(int t, int id) {
printf("%d start \n", id);
std::this_thread::sleep_for(std::chrono::seconds(t));
printf("%d end after %ds\n", id, t);
return t + id;
}
int main() {
ThreadPool::ThreadPool pool(3);
std::vector<std::future<int>> futures;
for (int i = 0; i < 10; i++) {
futures.emplace_back(pool.EnqueueJob(work, i % 3 + 1, i));
}
for (auto& f : futures) {
printf("result : %d \n", f.get());
}
}
위의 코드를 실행하면 Broken promise 예외가 던져지며 런타임 오류가 발생한다. Broken promise 예외는 promise 에 set_value 를 하기 전에 이미 promise 의 future 객체가 파괴되었다면 발생하는 예외이다. 그렇다면 돼 future 객체가 파괴되었을까?
std::packaged_task<return_type()> job(std::bind(f, args...));
EnqueueJob 함수에 정의된 job 객체는 지역 변수이다. 즉, EnqueueJob 함수가 리턴하면 파괴되는 객체이다. 따라서 [&job]() { job(); } 안에서 job 을 접근할 때 이미 그 객체는 파괴되고 없어져 있을 것이다.
이 문제를 해결하는 방법으로 크게 두 가지를 생각해볼 수 있다.
- packaged_task 를 따로 컨테이너에 저장해서 보관한다.
- shared_ptr 에 packaged_task 를 보관한다.
(1) 방식의 경우, packaged_task 를 사용하지 않을 때에도 컨테이너에 남아있다는 문제가 있다. 하지만 (2) 번 방식의 경우 packaged_task 를 사용하는 것이 없을 때 알아서 shared_ptr 가 객체를 소멸시켜주므로 훨씬 관리하기가 편하다!
auto job =
std::make_shared<std::packaged_task<return_type()>>(std::bind(f, args...));
std::future<return_type> job_result_future = job->get_future();
{
std::lock_guard<std::mutex> lock(m_job_q_);
jobs_.push([job]() { (*job)(); };
}
위와 같이 간단히 make_shared 를 통해 shared_ptr 를 생성하였고, 람다 함수에 shared_ptr 의 복사본을 전달해서 람다 함수 안에서도 packaged_task 의 shared_ptr 하나를 붙들고 있게 되었다. 따라서 job 을 실행하는 시점에서도 packaged_task 객체는 계속 살아있게 된다!
수정된 코드는 다음과 같다 :
#include <chrono>
#include <condition_variable>
#include <cstdio>
#include <functional>
#include <future>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>
namespace ThreadPool {
class ThreadPool {
public:
ThreadPool(size_t num_threads);
~ThreadPool();
// job 을 추가한다.
template <class F, class... Args>
std::future<typename std::result_of<F(Args...)>::type> EnqueueJob(
F f, Args... args);
private:
// 총 Worker 쓰레드의 개수.
size_t num_threads_;
// Worker 쓰레드를 보관하는 벡터.
std::vector<std::thread> worker_threads_;
// 할일들을 보관하는 job 큐.
std::queue<std::function<void()>> jobs_;
// 위의 job 큐를 위한 cv 와 m.
std::condition_variable cv_job_q_;
std::mutex m_job_q_;
// 모든 쓰레드 종료
bool stop_all;
// Worker 쓰레드
void WorkerThread();
};
ThreadPool::ThreadPool(size_t num_threads)
: num_threads_(num_threads), stop_all(false) {
worker_threads_.reserve(num_threads_);
for (size_t i = 0; i < num_threads_; ++i) {
worker_threads_.emplace_back([this]() { this->WorkerThread(); });
}
}
void ThreadPool::WorkerThread() {
while (true) {
std::unique_lock<std::mutex> lock(m_job_q_);
cv_job_q_.wait(lock, [this]() { return !this->jobs_.empty() || stop_all; });
if (stop_all && this->jobs_.empty()) {
return;
}
// 맨 앞의 job 을 뺀다.
std::function<void()> job = std::move(jobs_.front());
jobs_.pop();
lock.unlock();
// 해당 job 을 수행한다 :)
job();
}
}
ThreadPool::~ThreadPool() {
stop_all = true;
cv_job_q_.notify_all();
for (auto& t : worker_threads_) {
t.join();
}
}
template <class F, class... Args>
std::future<typename std::result_of<F(Args...)>::type> ThreadPool::EnqueueJob(
F f, Args... args) {
if (stop_all) {
throw std::runtime_error("ThreadPool 사용 중지됨");
}
using return_type = typename std::result_of<F(Args...)>::type;
auto job =
std::make_shared<std::packaged_task<return_type()>>(std::bind(f, args...));
std::future<return_type> job_result_future = job->get_future();
{
std::lock_guard<std::mutex> lock(m_job_q_);
jobs_.push([job]() { (*job)(); });
}
cv_job_q_.notify_one();
return job_result_future;
}
} // namespace ThreadPool
int work(int t, int id) {
printf("%d start \n", id);
std::this_thread::sleep_for(std::chrono::seconds(t));
printf("%d end after %ds\n", id, t);
return t + id;
}
int main() {
ThreadPool::ThreadPool pool(3);
std::vector<std::future<int>> futures;
for (int i = 0; i < 10; i++) {
futures.emplace_back(pool.EnqueueJob(work, i % 3 + 1, i));
}
for (auto& f : futures) {
printf("result : %d \n", f.get());
}
}
완벽한 전달
ThreadPool::EnqueueJob(F f, Args... args);
위의 EnqueueJob 함수의 경우, 인자들의 복사본을 받는다. 하지만 이는 불필요한 복사를 야기하므로, 완벽한 전달 패턴을 사용하는 것이 좋다.
EnqueueJob 함수의 인자들을 우측값 레퍼런스로 바꾼 뒤에, bind 함수에 forward 로 인자를 전달해주면 된다!
template <class F, class... Args>
std::future<typename std::result_of<F(Args...)>::type> EnqueueJob(F&& f, Args&&... args);
auto job = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...));
이제 불필요한 복사 없이 인자가 완벽하게 전달된다!
최종 소스는 다음과 같다 :
#include <chrono>
#include <condition_variable>
#include <cstdio>
#include <functional>
#include <future>
#include <mutex>
#include <queue>
#include <thread>
#include <vector>
namespace ThreadPool {
class ThreadPool {
public:
ThreadPool(size_t num_threads);
~ThreadPool();
// job 을 추가한다.
template <class F, class... Args>
std::future<typename std::result_of<F(Args...)>::type> EnqueueJob(
F&& f, Args&&... args);
private:
// 총 Worker 쓰레드의 개수.
size_t num_threads_;
// Worker 쓰레드를 보관하는 벡터.
std::vector<std::thread> worker_threads_;
// 할일들을 보관하는 job 큐.
std::queue<std::function<void()>> jobs_;
// 위의 job 큐를 위한 cv 와 m.
std::condition_variable cv_job_q_;
std::mutex m_job_q_;
// 모든 쓰레드 종료
bool stop_all;
// Worker 쓰레드
void WorkerThread();
};
ThreadPool::ThreadPool(size_t num_threads)
: num_threads_(num_threads), stop_all(false) {
worker_threads_.reserve(num_threads_);
for (size_t i = 0; i < num_threads_; ++i) {
worker_threads_.emplace_back([this]() { this->WorkerThread(); });
}
}
void ThreadPool::WorkerThread() {
while (true) {
std::unique_lock<std::mutex> lock(m_job_q_);
cv_job_q_.wait(lock, [this]() { return !this->jobs_.empty() || stop_all; });
if (stop_all && this->jobs_.empty()) {
return;
}
// 맨 앞의 job 을 뺀다.
std::function<void()> job = std::move(jobs_.front());
jobs_.pop();
lock.unlock();
// 해당 job 을 수행한다 :)
job();
}
}
ThreadPool::~ThreadPool() {
stop_all = true;
cv_job_q_.notify_all();
for (auto& t : worker_threads_) {
t.join();
}
}
template <class F, class... Args>
std::future<typename std::result_of<F(Args...)>::type> ThreadPool::EnqueueJob(
F&& f, Args&&... args) {
if (stop_all) {
throw std::runtime_error("ThreadPool 사용 중지됨");
}
using return_type = typename std::result_of<F(Args...)>::type;
auto job = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...));
std::future<return_type> job_result_future = job->get_future();
{
std::lock_guard<std::mutex> lock(m_job_q_);
jobs_.push([job]() { (*job)(); });
}
cv_job_q_.notify_one();
return job_result_future;
}
} // namespace ThreadPool
// 사용 예시
int work(int t, int id) {
printf("%d start \n", id);
std::this_thread::sleep_for(std::chrono::seconds(t));
printf("%d end after %ds\n", id, t);
return t + id;
}
int main() {
ThreadPool::ThreadPool pool(3);
std::vector<std::future<int>> futures;
for (int i = 0; i < 10; i++) {
futures.emplace_back(pool.EnqueueJob(work, i % 3 + 1, i));
}
for (auto& f : futures) {
printf("result : %d \n", f.get());
}
}
'Tutorials > C++ : Beginner' 카테고리의 다른 글
C++ 기초 개념 16-2 : constexpr 와 컴파일 타임 상수 (0) | 2022.05.23 |
---|---|
C++ 기초 개념 16-1 : C++ 유니폼 초기화(Uniform Initialization) (0) | 2022.05.23 |
C++ 기초 개념 15-4 : future, promise, packaged_task, async (0) | 2022.04.20 |
C++ 기초 개념 15-3 : atomic 객체와 memory order (0) | 2022.04.19 |
C++ 기초 개념 15-2 : 뮤텍스(Mutex)와 조건 변수(Condition Variable) (0) | 2022.04.19 |