KoreanFoodie's Study

[C++ 게임 서버] 6-5. JobQueue #4 본문

Game Dev/Game Server

[C++ 게임 서버] 6-5. JobQueue #4

GoldGiver 2023. 12. 20. 18:32

Rookiss 님의 '[C++과 언리얼로 만드는 MMORPG 게임 개발 시리즈] Part4: 게임 서버' 를 들으며 배운 내용을 정리하고 있습니다. 관심이 있으신 분은 꼭 한 번 들어보시기를 추천합니다!

[C++ 게임 서버] 6-5. JobQueue #4

핵심 :

1. JobQueue 를 템플릿 버전으로 만들어 LockQueue 라는 이름으로 바꿔보자.

2. Job 에 대한 처리를 Push 를 하는 쓰레드 중 일부가 실제로 처리하도록 만들 수 있다. 그럼 메인 서버에서 FlushJob 을 하던 부분을 특정 쓰레드가 담당할 것이다.

3. 2번에 대한 처리를 할 때는, 동기화에 대한 부분과 병목 현상에 대한 부분을 모두 고려해야 한다.

이번 시간에는, Job 을 처리하는 부분을 조금 더 고도화시켜볼 것이다.

또한, 이전에는 GameServer 에서 한 녀석이 FlushJob 을 호출하면서 Job 을 꺼내 하나하나 처리했는데, 이 부분을 그냥 Job 을 추가하는 쓰레드 중 하나가 알아서 처리하도록 만들 것이다! 😉

 

일단, JobQueue 대신 좀 더 범용적인 동작을 처리한다는 것을 표현하기 위해 이름을 Job 에서 Item 을 처리하도록 조금 바꿔줄 것이며, JobQueue 를 LockQueue 로 바꿔주자.

LockQueue.h (구 JobQueue)

template<typename T>
class LockQueue
{
public:
	void Push(JobRef item)
	{
		WRITE_LOCK;
		_items.push(item);
	}

	T Pop()
	{
		WRITE_LOCK;
		if (_items.empty())
			return T();

		T ret = _items.front();
		_items.pop();
		return ret;
	}

	void PopAll(OUT Vector<T>& items)
	{
		WRITE_LOCK;
		while (T item = Pop())
			items.push_back(item);
	}

	void Clear()
	{
		WRITE_LOCK;
		_items = Queue<T>();
	}

private:
	USE_LOCK;
	Queue<T> _items;
};

 

그리고 이전에 만들었던 JobSerializer 를 JobQueue 로 바꿔주자. 이름이 기니까 불편했다 😅

JobQueue.h (구 JobSerializer.h)

/*---------------
	JobQueue
----------------*/

class JobQueue : public enable_shared_from_this<JobQueue>
{
public:
	void DoAsync(CallbackType&& callback)
	{
		Push(ObjectPool<Job>::MakeShared(std::move(callback)));
	}

	template<typename T, typename Ret, typename... Args>
	void DoAsync(Ret(T::*memFunc)(Args...), Args... args)
	{
		shared_ptr<T> owner = static_pointer_cast<T>(shared_from_this());
		Push(ObjectPool<Job>::MakeShared(owner, memFunc, std::forward<Args>(args)...));
	}

	void				ClearJobs() { _jobs.Clear(); }

private:
	void				Push(JobRef&& job);
	void				Execute();

protected:
	LockQueue<JobRef>	_jobs;
	Atomic<int32>		_jobCount = 0;
};

사용 방식은 거의 비슷해 보인다. 다만, _jobCount 라는 것이 생겼고, 이를 통해서 JobQueue 에 쌓인 Job 을 처리하는 로직을 다음과 같이 수정해 줄 것이다.

그리고 기존에는 LockQueue 에 Job 을 밀어주기만 했는데, 이제 실제로 Execute 를 할 수도 있게 바뀔 것이므로, DoAsync 라는 네이밍이 추가된 것을 숙지해 두자.

 

JobQueue.cpp

/*---------------
	JobQueue
----------------*/

void JobQueue::Push(JobRef&& job)
{
	const int32 prevCount = _jobCount.fetch_add(1);
	_jobs.Push(job); // WRITE_LOCK

	// 첫번째 Job을 넣은 쓰레드가 실행까지 담당
	if (prevCount == 0)
	{
		Execute();
	}
}

// 1) 일감이 너~무 몰리면?
// 2) DoAsync 타고 타고 가서~ 절대 끝나지 않는 상황 (일감이 한 쓰레드한테 몰림)
void JobQueue::Execute()
{
	while (true)
	{
		Vector<JobRef> jobs;
		_jobs.PopAll(OUT jobs);

		const int32 jobCount = static_cast<int32>(jobs.size());
		for (int32 i = 0; i < jobCount; i++)
			jobs[i]->Execute();

		// 남은 일감이 0개라면 종료
		if (_jobCount.fetch_sub(jobCount) == jobCount)
		{
			return;
		}
	}
}

일단 Push 부터 살펴보자. 

prevCount 는 일단 LockQueue 에 Job 을 넣기 전의 값을 받는다. 그 후, 만약 prevCount 가 0 이면(즉, Job 을 넣는 시점에서 추가되었던 Job 이 없다면), 해당 쓰레드가 LockQueue 에 쌓인 일감을 처리해 준다.

 

이제 Execute 를 보면... 대략적인 로직은 기존의 FlushJob 이 하던 것과 거의 동일하다.

Job 들을 꺼내고, JobCount 의 갯수를 세서 해당 갯수만큼을 Execute 해 준다.

다만, _jobCount 를 jobCount 와 비교하는 구문이 있는데... fetch_sub 는, 뺄셈을 하되 뺄셈을 하기 이전의 값을 리턴한다. 즉, 만약 두 값이 같다는 것은 빼 줄 jobCount 가 0 이라는 뜻이므로, 더 이상 수행할 Job 이 남아 있지 않음을 알 수 있다. 그래서 바로 return 을 때려주는 것이다! 😁

 

다만 위 코드는 주의할 점이 두 가지 있다.

먼저, Push 에서의 아래 코드를 보자.

const int32 prevCount = _jobCount.fetch_add(1);
_jobs.Push(job); // WRITE_LOCK

만약 이 두 줄의 순서가 바뀐다면 무슨 일이 일어날까...?

만약 job 이 Push 가 되었고, _jobCount 가 증가되기 이전에, Execute 에서 _jobCount 가 0 인지 체크하는 로직이 불린다면...

jobCount 는 _jobCount 보다 1 이 더 클 것이므로, _jobCount 가 -1 이 되는, 기묘한 상황이 발생한다! 따라서 위 코드의 순서를 잘 지켜주거나, 문제가 없도록 묶어 주어야 할 것이다. 😊

 

두 번째 문제는 성능 상의 이슈이다.

우리는 첫 번째 Job 을 넣은 쓰레드가 실제 실행을 담당하도록 만들었는데... 만약 Job 이 끊임없이 추가된다면 어떻게 될까? 그렇게 되면 첫 번째로 Job 을 넣은 쓰레드는 잡일만 계속 처리하게 되어 특정 유저만 렉이 걸리게 될 수도 있다.

또한, DoAsync 를 통해 특정 동작을 하는 도중, 내부에서 또 Execute 가 호출되는 상황이 발생하면, 정말로 일감에 대한 처리가 영원히 끝나지 않을 수도 있다. 😂

이 두번째 문제는 다음 글에서 처리해 보도록 하겠다. 😎

 

참고로, 이제 GameServer 에서 FlushJob 을 해주는 로직은 필요가 없을 것이다. 그리고 ClientPacketHandler 에서 패킷을 핸들링하는 함수는 다음과 같이 거의 네이밍만 PushJob 에서 DoAsync 로 바뀔 것이다.

bool Handle_C_CHAT(PacketSessionRef& session, Protocol::C_CHAT& pkt)
{
	std::cout << pkt.msg() << endl;

	Protocol::S_CHAT chatPkt;
	chatPkt.set_msg(pkt.msg());
	auto sendBuffer = ClientPacketHandler::MakeSendBuffer(chatPkt);

	GRoom->DoAsync(&Room::Broadcast, sendBuffer);

	return true;
}
 
Comments