KoreanFoodie's Study

C++ 기초 개념 15-2 : 뮤텍스(Mutex)와 조건 변수(Condition Variable) 본문

Tutorials/C++ : Beginner

C++ 기초 개념 15-2 : 뮤텍스(Mutex)와 조건 변수(Condition Variable)

GoldGiver 2022. 4. 19. 10:23

모두의 코드를 참고하여 핵심 내용을 간추리고 있습니다. 자세한 내용은 모두의 코드의 씹어먹는 C++ 강좌를 참고해 주세요!

Race Condition

서로 다른 쓰레드에서 같은 메모리를 공유할 때 발생하는 문제를 경쟁 상태(race condition) 이라고 부른다. 다음 코드를 보자.

#include <iostream>
#include <thread>
#include <vector>

void worker(int& counter) {
  for (int i = 0; i < 10000; i++) {
    counter += 1;
  }
}

int main() {
  int counter = 0;

  std::vector<std::thread> workers;
  for (int i = 0; i < 4; i++) {
    // 레퍼런스로 전달하려면 ref 함수로 감싸야 한다 (지난 강좌 bind 함수 참조)
    workers.push_back(std::thread(worker, std::ref(counter)));
  }

  for (int i = 0; i < 4; i++) {
    workers[i].join();
  }

  std::cout << "Counter 최종 값 : " << counter << std::endl;
}

사실 우리가 짠 코드는 어셈블리어로 번역되어 순차적으로 실행되는데, 멀티 쓰레드일 경우 래지스터에 값을 저장하거나 연산을 수행할때 이 순서가 뒤죽박죽이 될 수 있다. 따라서, 위의 코드의 counter 는 40000 이 아닌 이상한 값이 나오게 된다.

 

 

뮤텍스(Mutex)

위와 같은 문제를 해결하기 위해, 뮤텍스(Mutex)를 활용할 수 있다.

#include <iostream>
#include <mutex>  // mutex 를 사용하기 위해 필요
#include <thread>
#include <vector>

void worker(int& result, std::mutex& m) {
  for (int i = 0; i < 10000; i++) {
    m.lock();
    result += 1;
    m.unlock();
  }
}

int main() {
  int counter = 0;
  std::mutex m;  // 우리의 mutex 객체

  std::vector<std::thread> workers;
  for (int i = 0; i < 4; i++) {
    workers.push_back(std::thread(worker, std::ref(counter), std::ref(m)));
  }

  for (int i = 0; i < 4; i++) {
    workers[i].join();
  }

  std::cout << "Counter 최종 값 : " << counter << std::endl;
}

뮤텍스를 사용하면 특정 쓰레드가 뮤텍스를 가지고 있을 때 다른 쓰레드가 뮤텍스를 얻기 전까지는 해당 영역의 코드를 실행할 수 없게 된다. 그 특정 영역을 임계 영역(Critical Section) 이라고 한다.

// 임계 영역(Critical Section)
m.lock();
result += 1;
m.unlock();

 

만약 mutex 를 unlock 하지 않으면 어떻게 될까?

만일 mutex 의 해제가 제대로 이루어지지 않으면, 데드락(DeadLock) 문제가 발생한다. 이를 방지하기 위해, 이전에 배운 unique_ptr 를 이용해 소멸자에서 메모리를 해제하도록 만들 수 있다.

#include <iostream>
#include <mutex>
#include <thread>
#include <vector>

void worker(int& result, std::mutex& m) {
	for (int i = 0; i < 10000; ++i) {
		// When lock is created, m.lock() is called
		std::lock_guard<std::mutex> lock(m);
		result += 1;

		// lock vanishes as escaping the scope
		// unlock m by itself;
	}
}

int main() {

	int counter = 0;
	std::mutex m;

	std::vector<std::thread> workers;
	for (int i = 0; i < 4; ++i) {
		workers.push_back(std::thread(worker, std::ref(counter), std::ref(m)));
	}

	for (int i = 0; i < 4; ++i) {
		workers[i].join();
	}

	std::cout << "Counter Value : " << counter << std::endl;
}

lock_guard 객체는 뮤텍스를 인자로 받아 생성자에서 뮤텍스를 lock 한다. 그 후 lock_guard 가 소멸될 때 알아서 lock 했던 뮤텍스를 unlock 해주게 된다.

 

 

데드락 (deadlock)

하지만 unique_ptr 를 활용해도 데드락 상황은 얼마든지 생길 수 있다. 다음과 같은 코드는 절대 스스로 종료하지 않는다.

#include <iostream>
#include <mutex>  // mutex 를 사용하기 위해 필요
#include <thread>

void worker1(std::mutex& m1, std::mutex& m2) {
  for (int i = 0; i < 10000; i++) {
    std::lock_guard<std::mutex> lock1(m1);
    std::lock_guard<std::mutex> lock2(m2);
    // Do something
  }
}

void worker2(std::mutex& m1, std::mutex& m2) {
  for (int i = 0; i < 10000; i++) {
    std::lock_guard<std::mutex> lock2(m2);
    std::lock_guard<std::mutex> lock1(m1);
    // Do something
  }
}

int main() {
  int counter = 0;
  std::mutex m1, m2;  // 우리의 mutex 객체

  std::thread t1(worker1, std::ref(m1), std::ref(m2));
  std::thread t2(worker2, std::ref(m1), std::ref(m2));

  t1.join();
  t2.join();

  std::cout << "끝!" << std::endl;
}

왜냐하면 worker1이 lock1을, worker2 가 lock2를 먼저 가져가므로, 후순위의 lock을 가져갈 수 없는 상황이 발생하는 것이다.

 

worker 부분을 다음과 같이 바꿔주면 어떨까?

void worker1(std::mutex& m1, std::mutex& m2) {
  for (int i = 0; i < 10; i++) {
    m1.lock();
    m2.lock();
    std::cout << "Worker1 Hi! " << i << std::endl;

    m2.unlock();
    m1.unlock();
  }
}

void worker2(std::mutex& m1, std::mutex& m2) {
  for (int i = 0; i < 10; i++) {
    while (true) {
      m2.lock();

      // m1 이 이미 lock 되어 있다면 "야 차 빼" 를 수행하게 된다.
      if (!m1.try_lock()) {
        m2.unlock();
        continue;
      }

      std::cout << "Worker2 Hi! " << i << std::endl;
      m1.unlock();
      m2.unlock();
      break;
    }
  }
}

위의 코드는 worker2 가 worker1에게 작업을 양보하는 형태로 데드락을 해결했다. 하지만 이렇듯 한 쓰레드가 다른 쓰레드에 대해 우위를 갖게 되면, 우선순위가 낮은 쓰레드가 할 일이 없는 기아 상태(starvation)가 발생할 수 있다.

 

데드락 상황을 피하기 위한 가이드라인 (C++ Concurrency In Action)

  1. 중첩된 Lock을 사용하는 것을 피하라 (일반적으로는 1개의 Lock 만으로 충분하다)
  2. Lock 을 소유하고 있을 때 유저 코드를 호출하는 것을 피해라
  3. Lock 을 언제나 정해진 순서로 획득하라 (1->2, 1->2 는 좋지만 1->2, 2->1 는 곤란)

 

 

생산자(Producer) 와 소비자(Consumer) 패턴

웹 페이지를 긁어 오는 producer 함수와 읽어오는 consumer 함수가 있다고 하자. 긁어온 웹 데이터를 저장하거나 읽을 떄는 lock 을 소유해야 하는데, 상대적으로 읽는 작업이 웹 서버를 조회하는 것보다 빠르다. 따라서, 웹 페이지 내용을 저장할 때 lock 을 소유했다가 작업이 끝나면 consumer 함수를 깨우는 방식으로 코드를 짜면 된다.

잠들어 있는 쓰레드를 깨울 때는 조건 변수(condition_variable)을 이용한다.

#include <chrono>	// std::chrono::miliseconds
#include <condition_variable> // std::condition_variable
#include <iostream>
#include <mutex>
#include <queue>
#include <string>
#include <thread>
#include <vector>

void producer(std::queue<std::string>* downloaded_pages, std::mutex* m, int index, std::condition_variable* cv) {
	for (int i = 0; i < 5; ++i) {
		// Time required to download websites
		std::this_thread::sleep_for(std::chrono::milliseconds(100 * index));
		std::string content = "Website : " + std::to_string(i) + " from thread(" + std::to_string(index) + ")\n";

		// data should be inside the critical section
		m->lock();
		downloaded_pages->push(content);
		m->unlock();

		// notify consumer that the content is ready
		cv->notify_one();
	}
}

void consumer(std::queue<std::string>* downloaded_pages, std::mutex* m, int* num_processed, std::condition_variable* cv) {
	while (*num_processed < 25) {
		std::unique_lock<std::mutex> lk(*m);

		cv->wait(lk, [&] { return !downloaded_pages->empty() || *num_processed == 25; });

		if (*num_processed ==25) {
			lk.unlock();
			return;
		}

		// read front page and delete it from the queue
		std::string content = downloaded_pages->front();
		downloaded_pages->pop();

		(*num_processed)++;
		lk.unlock();

		std::cout << content;
		std::this_thread::sleep_for(std::chrono::milliseconds(80));
	}
}

int main() {

	std::queue<std::string> downloaded_pages;
	std::mutex m;
	std::condition_variable cv;

	std::vector<std::thread> producers;
	for (int i = 0; i < 5; ++i) {
		producers.push_back(
			std::thread(producer, &downloaded_pages, &m, i+1, &cv));
	}

	int num_processed = 0;
	std::vector<std::thread> consumers;
	for (int i = 0; i < 3; ++i) {
		consumers.push_back(
			std::thread(consumer, &downloaded_pages, &m, &num_processed, &cv));
	}

	for (int i = 0; i < 5; ++i) {
		producers[i].join();
	}

	// wake up all threads sleeping
	cv.notify_all();

	for (int i = 0; i < 3; ++i) {
		consumers[i].join();
	}

}

먼저 producer 함수부터 보자.

 

std::this_thread::sleep_for(std::chrono::milliseconds(100 * index));

위의 코드는 인자로 전달된 만큼 (여기서는 100ms) 만큼 함수를 sleep 시킨다. 즉, 여기서는 쓰레드가 5개이므로, 5개의쓰레드가 각각 0, 100, 200, 300, 400 (단위는 ms)만큼 잠들게 된다.

각각의 쓰레드는 content 를 읽은 후, 이를 downloaded_pages 라는 queue 에 넣는데, 이 때 mutex 를 lock 하고 unlock 하는 과정이 필요하다.

content 를 쓴 다음에는, cv->notify_one( ) 함수를 이용해 consumer 의 잠자고 있는 쓰레드들 중 하나를 깨우게 된다.

 

이제 consumer 함수를 보자.

std::unique_lock<std::mutex> lk(*m);

cv->wait(lk, [&]{ return !downloaded_pages->empty() || *num_processed == 25; });
if (*num_processed == 25) {
    lk.unlock();
    return;
}

먼저, unique_lock 을 이용해 뮤텍스를 스마트 포인터로 생성한다.

그 후, cv->wati 함수를 사용하는데, 첫 번째 인자는 unique_lock 객체가, 두번째는 wait 의 탈출조건이 들어가게 된다. 두번째의 조건이 true이면, cv->wait( )은 리턴을 하고 다음 줄이 실행이 된다. 만약 false 라면, 해당 쓰레드는 unlock 을 하며 sleep 하게 된다.

만약 sleep 한 상태에서 notify_one 함수를 통해 깨어나라는 이야기를 들으면, 다시 한번 cv->wait 의 두번째 조건을 확인하고, true 이면 lock 을 가진 상태로 쓰레드가 깨어나게 된다!

그 이후에는 작업 큐의 첫 요소를 제거하고, content 를 출력한 후, 80 milliseconds 를 잠든다. 80 밀리초는 content 를 읽는데 걸리는 시간을 대략적으로 추정하여 넣은 값이다.

 

마지막으로 main 함수를 보자. 사실 로직은 이전까지의 쓰레드 사용과 비슷하다.

cv.notify_all();

다만 위 문장이 추가가 되었는데, 이는 잠자고 있는 쓰레드의 경우 join 이 되지 않기 때문이다. 이를 방지하기 위해 자고 있는 쓰레드들을 전부 깨워 join 까지 해 주면, 모든 작업이 깔끔하게 마무리된다.

 

참고 : condition_variable의 다른 유용한 함수들 : wiat_for , wait_until

Comments