KoreanFoodie's Study
C++ 기초 개념 15-2 : 뮤텍스(Mutex)와 조건 변수(Condition Variable) 본문
C++ 기초 개념 15-2 : 뮤텍스(Mutex)와 조건 변수(Condition Variable)
GoldGiver 2022. 4. 19. 10:23
모두의 코드를 참고하여 핵심 내용을 간추리고 있습니다. 자세한 내용은 모두의 코드의 씹어먹는 C++ 강좌를 참고해 주세요!
Race Condition
서로 다른 쓰레드에서 같은 메모리를 공유할 때 발생하는 문제를 경쟁 상태(race condition) 이라고 부른다. 다음 코드를 보자.
#include <iostream>
#include <thread>
#include <vector>
void worker(int& counter) {
for (int i = 0; i < 10000; i++) {
counter += 1;
}
}
int main() {
int counter = 0;
std::vector<std::thread> workers;
for (int i = 0; i < 4; i++) {
// 레퍼런스로 전달하려면 ref 함수로 감싸야 한다 (지난 강좌 bind 함수 참조)
workers.push_back(std::thread(worker, std::ref(counter)));
}
for (int i = 0; i < 4; i++) {
workers[i].join();
}
std::cout << "Counter 최종 값 : " << counter << std::endl;
}
사실 우리가 짠 코드는 어셈블리어로 번역되어 순차적으로 실행되는데, 멀티 쓰레드일 경우 래지스터에 값을 저장하거나 연산을 수행할때 이 순서가 뒤죽박죽이 될 수 있다. 따라서, 위의 코드의 counter 는 40000 이 아닌 이상한 값이 나오게 된다.
뮤텍스(Mutex)
위와 같은 문제를 해결하기 위해, 뮤텍스(Mutex)를 활용할 수 있다.
#include <iostream>
#include <mutex> // mutex 를 사용하기 위해 필요
#include <thread>
#include <vector>
void worker(int& result, std::mutex& m) {
for (int i = 0; i < 10000; i++) {
m.lock();
result += 1;
m.unlock();
}
}
int main() {
int counter = 0;
std::mutex m; // 우리의 mutex 객체
std::vector<std::thread> workers;
for (int i = 0; i < 4; i++) {
workers.push_back(std::thread(worker, std::ref(counter), std::ref(m)));
}
for (int i = 0; i < 4; i++) {
workers[i].join();
}
std::cout << "Counter 최종 값 : " << counter << std::endl;
}
뮤텍스를 사용하면 특정 쓰레드가 뮤텍스를 가지고 있을 때 다른 쓰레드가 뮤텍스를 얻기 전까지는 해당 영역의 코드를 실행할 수 없게 된다. 그 특정 영역을 임계 영역(Critical Section) 이라고 한다.
// 임계 영역(Critical Section)
m.lock();
result += 1;
m.unlock();
만약 mutex 를 unlock 하지 않으면 어떻게 될까?
만일 mutex 의 해제가 제대로 이루어지지 않으면, 데드락(DeadLock) 문제가 발생한다. 이를 방지하기 위해, 이전에 배운 unique_ptr 를 이용해 소멸자에서 메모리를 해제하도록 만들 수 있다.
#include <iostream>
#include <mutex>
#include <thread>
#include <vector>
void worker(int& result, std::mutex& m) {
for (int i = 0; i < 10000; ++i) {
// When lock is created, m.lock() is called
std::lock_guard<std::mutex> lock(m);
result += 1;
// lock vanishes as escaping the scope
// unlock m by itself;
}
}
int main() {
int counter = 0;
std::mutex m;
std::vector<std::thread> workers;
for (int i = 0; i < 4; ++i) {
workers.push_back(std::thread(worker, std::ref(counter), std::ref(m)));
}
for (int i = 0; i < 4; ++i) {
workers[i].join();
}
std::cout << "Counter Value : " << counter << std::endl;
}
lock_guard 객체는 뮤텍스를 인자로 받아 생성자에서 뮤텍스를 lock 한다. 그 후 lock_guard 가 소멸될 때 알아서 lock 했던 뮤텍스를 unlock 해주게 된다.
데드락 (deadlock)
하지만 unique_ptr 를 활용해도 데드락 상황은 얼마든지 생길 수 있다. 다음과 같은 코드는 절대 스스로 종료하지 않는다.
#include <iostream>
#include <mutex> // mutex 를 사용하기 위해 필요
#include <thread>
void worker1(std::mutex& m1, std::mutex& m2) {
for (int i = 0; i < 10000; i++) {
std::lock_guard<std::mutex> lock1(m1);
std::lock_guard<std::mutex> lock2(m2);
// Do something
}
}
void worker2(std::mutex& m1, std::mutex& m2) {
for (int i = 0; i < 10000; i++) {
std::lock_guard<std::mutex> lock2(m2);
std::lock_guard<std::mutex> lock1(m1);
// Do something
}
}
int main() {
int counter = 0;
std::mutex m1, m2; // 우리의 mutex 객체
std::thread t1(worker1, std::ref(m1), std::ref(m2));
std::thread t2(worker2, std::ref(m1), std::ref(m2));
t1.join();
t2.join();
std::cout << "끝!" << std::endl;
}
왜냐하면 worker1이 lock1을, worker2 가 lock2를 먼저 가져가므로, 후순위의 lock을 가져갈 수 없는 상황이 발생하는 것이다.
worker 부분을 다음과 같이 바꿔주면 어떨까?
void worker1(std::mutex& m1, std::mutex& m2) {
for (int i = 0; i < 10; i++) {
m1.lock();
m2.lock();
std::cout << "Worker1 Hi! " << i << std::endl;
m2.unlock();
m1.unlock();
}
}
void worker2(std::mutex& m1, std::mutex& m2) {
for (int i = 0; i < 10; i++) {
while (true) {
m2.lock();
// m1 이 이미 lock 되어 있다면 "야 차 빼" 를 수행하게 된다.
if (!m1.try_lock()) {
m2.unlock();
continue;
}
std::cout << "Worker2 Hi! " << i << std::endl;
m1.unlock();
m2.unlock();
break;
}
}
}
위의 코드는 worker2 가 worker1에게 작업을 양보하는 형태로 데드락을 해결했다. 하지만 이렇듯 한 쓰레드가 다른 쓰레드에 대해 우위를 갖게 되면, 우선순위가 낮은 쓰레드가 할 일이 없는 기아 상태(starvation)가 발생할 수 있다.
데드락 상황을 피하기 위한 가이드라인 (C++ Concurrency In Action)
- 중첩된 Lock을 사용하는 것을 피하라 (일반적으로는 1개의 Lock 만으로 충분하다)
- Lock 을 소유하고 있을 때 유저 코드를 호출하는 것을 피해라
- Lock 을 언제나 정해진 순서로 획득하라 (1->2, 1->2 는 좋지만 1->2, 2->1 는 곤란)
생산자(Producer) 와 소비자(Consumer) 패턴
웹 페이지를 긁어 오는 producer 함수와 읽어오는 consumer 함수가 있다고 하자. 긁어온 웹 데이터를 저장하거나 읽을 떄는 lock 을 소유해야 하는데, 상대적으로 읽는 작업이 웹 서버를 조회하는 것보다 빠르다. 따라서, 웹 페이지 내용을 저장할 때 lock 을 소유했다가 작업이 끝나면 consumer 함수를 깨우는 방식으로 코드를 짜면 된다.
잠들어 있는 쓰레드를 깨울 때는 조건 변수(condition_variable)을 이용한다.
#include <chrono> // std::chrono::miliseconds
#include <condition_variable> // std::condition_variable
#include <iostream>
#include <mutex>
#include <queue>
#include <string>
#include <thread>
#include <vector>
void producer(std::queue<std::string>* downloaded_pages, std::mutex* m, int index, std::condition_variable* cv) {
for (int i = 0; i < 5; ++i) {
// Time required to download websites
std::this_thread::sleep_for(std::chrono::milliseconds(100 * index));
std::string content = "Website : " + std::to_string(i) + " from thread(" + std::to_string(index) + ")\n";
// data should be inside the critical section
m->lock();
downloaded_pages->push(content);
m->unlock();
// notify consumer that the content is ready
cv->notify_one();
}
}
void consumer(std::queue<std::string>* downloaded_pages, std::mutex* m, int* num_processed, std::condition_variable* cv) {
while (*num_processed < 25) {
std::unique_lock<std::mutex> lk(*m);
cv->wait(lk, [&] { return !downloaded_pages->empty() || *num_processed == 25; });
if (*num_processed ==25) {
lk.unlock();
return;
}
// read front page and delete it from the queue
std::string content = downloaded_pages->front();
downloaded_pages->pop();
(*num_processed)++;
lk.unlock();
std::cout << content;
std::this_thread::sleep_for(std::chrono::milliseconds(80));
}
}
int main() {
std::queue<std::string> downloaded_pages;
std::mutex m;
std::condition_variable cv;
std::vector<std::thread> producers;
for (int i = 0; i < 5; ++i) {
producers.push_back(
std::thread(producer, &downloaded_pages, &m, i+1, &cv));
}
int num_processed = 0;
std::vector<std::thread> consumers;
for (int i = 0; i < 3; ++i) {
consumers.push_back(
std::thread(consumer, &downloaded_pages, &m, &num_processed, &cv));
}
for (int i = 0; i < 5; ++i) {
producers[i].join();
}
// wake up all threads sleeping
cv.notify_all();
for (int i = 0; i < 3; ++i) {
consumers[i].join();
}
}
먼저 producer 함수부터 보자.
std::this_thread::sleep_for(std::chrono::milliseconds(100 * index));
위의 코드는 인자로 전달된 만큼 (여기서는 100ms) 만큼 함수를 sleep 시킨다. 즉, 여기서는 쓰레드가 5개이므로, 5개의쓰레드가 각각 0, 100, 200, 300, 400 (단위는 ms)만큼 잠들게 된다.
각각의 쓰레드는 content 를 읽은 후, 이를 downloaded_pages 라는 queue 에 넣는데, 이 때 mutex 를 lock 하고 unlock 하는 과정이 필요하다.
content 를 쓴 다음에는, cv->notify_one( ) 함수를 이용해 consumer 의 잠자고 있는 쓰레드들 중 하나를 깨우게 된다.
이제 consumer 함수를 보자.
std::unique_lock<std::mutex> lk(*m);
cv->wait(lk, [&]{ return !downloaded_pages->empty() || *num_processed == 25; });
if (*num_processed == 25) {
lk.unlock();
return;
}
먼저, unique_lock 을 이용해 뮤텍스를 스마트 포인터로 생성한다.
그 후, cv->wati 함수를 사용하는데, 첫 번째 인자는 unique_lock 객체가, 두번째는 wait 의 탈출조건이 들어가게 된다. 두번째의 조건이 true이면, cv->wait( )은 리턴을 하고 다음 줄이 실행이 된다. 만약 false 라면, 해당 쓰레드는 unlock 을 하며 sleep 하게 된다.
만약 sleep 한 상태에서 notify_one 함수를 통해 깨어나라는 이야기를 들으면, 다시 한번 cv->wait 의 두번째 조건을 확인하고, true 이면 lock 을 가진 상태로 쓰레드가 깨어나게 된다!
그 이후에는 작업 큐의 첫 요소를 제거하고, content 를 출력한 후, 80 milliseconds 를 잠든다. 80 밀리초는 content 를 읽는데 걸리는 시간을 대략적으로 추정하여 넣은 값이다.
마지막으로 main 함수를 보자. 사실 로직은 이전까지의 쓰레드 사용과 비슷하다.
cv.notify_all();
다만 위 문장이 추가가 되었는데, 이는 잠자고 있는 쓰레드의 경우 join 이 되지 않기 때문이다. 이를 방지하기 위해 자고 있는 쓰레드들을 전부 깨워 join 까지 해 주면, 모든 작업이 깔끔하게 마무리된다.
'Tutorials > C++ : Beginner' 카테고리의 다른 글
C++ 기초 개념 15-4 : future, promise, packaged_task, async (0) | 2022.04.20 |
---|---|
C++ 기초 개념 15-3 : atomic 객체와 memory order (0) | 2022.04.19 |
C++ 기초 개념 15-1 : 쓰레드(thread)의 기초와 실습 (2) | 2022.04.19 |
C++ 기초 개념 14 : 함수를 객체로 사용하기 (std::function, std::mem_fn, std::bind) (0) | 2022.04.19 |
C++ 기초 개념 13-2 : shared_ptr 와 weak_ptr (0) | 2022.04.19 |