KoreanFoodie's Study

[C++ 게임 서버] 4-8. SendBuffer 본문

Game Dev/Game Server

[C++ 게임 서버] 4-8. SendBuffer

GoldGiver 2023. 12. 13. 02:43

Rookiss 님의 '[C++과 언리얼로 만드는 MMORPG 게임 개발 시리즈] Part4: 게임 서버' 를 들으며 배운 내용을 정리하고 있습니다. 관심이 있으신 분은 꼭 한 번 들어보시기를 추천합니다!

[C++ 게임 서버] 4-8. SendBuffer

핵심 :

1. Send 는 Recv 보다 살짝 복잡할 수 있다. Lock 등으로 동기화 이슈를 잘 해결해 주어야 한다.

2. Session 의 갯수가 많아질 경우, SessionManager 를 이용해 Session 을 관리해 보자. 

이번 글에서는 SendBuffer 를 만들어 보자. 이전에 Send 는 Recv 보다 조금 더 복잡할 수도 있다고 얘기한 바 있는데... 그래서 그런지 Send 는 Recv 보다 아주 살짝(?) 복잡할 것이다 😅

먼저, 이전에 Session 에서 만들었던 Send 의 구현을 한 번 보자.

void Session::Send(BYTE* buffer, int32 len)
{
	// 생각할 문제
	// 1) 버퍼 관리?
	// 2) sendEvent 관리? 단일? 여러개? WSASend 중첩?

	// TEMP
	SendEvent* sendEvent = xnew<SendEvent>();
	sendEvent->owner = shared_from_this(); // ADD_REF
	sendEvent->buffer.resize(len);
	::memcpy(sendEvent->buffer.data(), buffer, len);

	WRITE_LOCK;
	RegisterSend(sendEvent);
}

잘 보면, Send 가 불릴 때마다 SendEvent 를 만들고, memcpy 를 해 주는데... 사실 이는 Send 가 여러 번 호출될 수 있다는 것을 생각해보면 낭비가 아닐 수 없다. 그래서 우리는 SendBuffer 라는 클래스를 만들어 RecvBuffer 에서처럼 단순화 및 효율화 작업을 진행할 것이다!

 

먼저, Session 의 헤더에 아래 송신 관련 변수를 선언해 주자.

                        /* 송신 관련 */
Queue<SendBufferRef>	_sendQueue;
Atomic<bool>		_sendRegistered = false;

SendBufferRef 는 SendBuffer 클래스의 Reference Counting 버전일 뿐이다. 😂 이제 SendBuffer 클래스를 살펴보자.

 

SendBuffer.h

class SendBuffer : enable_shared_from_this<SendBuffer>
{
public:
	SendBuffer(int32 bufferSize);
	~SendBuffer();

	BYTE* Buffer() { return _buffer.data(); }
	int32 WriteSize() { return _writeSize; }
	int32 Capacity() { return static_cast<int32>(_buffer.size()); }

	void CopyData(void* data, int32 len);

private:
	Vector<BYTE>	_buffer;
	int32			_writeSize = 0;
};

흠. 사실 너무 간단해서 딱히 설명할 부분도 없어 보이긴 하는데... 

 

SendBuffer.cpp

SendBuffer::SendBuffer(int32 bufferSize)
{
	_buffer.resize(bufferSize);
}

SendBuffer::~SendBuffer()
{
}

void SendBuffer::CopyData(void* data, int32 len)
{
	ASSERT_CRASH(Capacity() >= len);
	::memcpy(_buffer.data(), data, len);
	_writeSize = len;
}

CopyData 에서는 기존의 Send 함수에서 했던 memcpy 를 불러주고 있다. 결국 memcpy 를 한 번은 무조건 쓰긴 해야 할 것이다. 😀

 

그럼 Session 클래스에서 Send 를 다음과 같이 바꿔 줄 수 있다 :

void Session::Send(SendBufferRef sendBuffer)
{
	// 현재 RegisterSend가 걸리지 않은 상태라면, 걸어준다
	WRITE_LOCK;

	_sendQueue.push(sendBuffer);

	/*if (_sendRegistered == false)
	{
		_sendRegistered = true;
		RegisterSend();
	}*/
	
	if (_sendRegistered.exchange(true) == false)
		RegisterSend();
}

즉, 기존에 일일히 이벤트를 만들고 memcpy 를 호출하는 것이 아니라, _sendQueue 에 sendBuffer 를 넣어주고, 만약 send 이벤트가 등록되지 않은 상태라면(_sendRegistered.exchange(true)... 부분), RegisterSend 를 호출해 Send 이벤트를 등록해 준다.

 

그럼 RegisterSend 는 어떻게 구현되어 있을까? 사실 이 부분이 핵심이다! 😎

void Session::RegisterSend()
{
	if (IsConnected() == false)
		return;

	_sendEvent.Init();
	_sendEvent.owner = shared_from_this(); // ADD_REF

	// 1부 : 보낼 데이터를 sendEvent에 등록
	{
		WRITE_LOCK;

		int32 writeSize = 0;
		while (_sendQueue.empty() == false)
		{
			SendBufferRef sendBuffer = _sendQueue.front();

			writeSize += sendBuffer->WriteSize();
			// TODO : 예외 체크

			_sendQueue.pop();
			_sendEvent.sendBuffers.push_back(sendBuffer);
		}
	}

	// 2부 : Scatter-Gather (흩어져 있는 데이터들을 모아서 한 방에 보낸다)
	Vector<WSABUF> wsaBufs;
	wsaBufs.reserve(_sendEvent.sendBuffers.size());
	for (SendBufferRef sendBuffer : _sendEvent.sendBuffers)
	{
		WSABUF wsaBuf;
		wsaBuf.buf = reinterpret_cast<char*>(sendBuffer->Buffer());
		wsaBuf.len = static_cast<LONG>(sendBuffer->WriteSize());
		wsaBufs.push_back(wsaBuf);
	}

	DWORD numOfBytes = 0;
	if (SOCKET_ERROR == ::WSASend(_socket, wsaBufs.data(), static_cast<DWORD>(wsaBufs.size()), OUT &numOfBytes, 0, &_sendEvent, nullptr))
	{
		int32 errorCode = ::WSAGetLastError();
		if (errorCode != WSA_IO_PENDING)
		{
			HandleError(errorCode);
			_sendEvent.owner = nullptr; // RELEASE_REF
			_sendEvent.sendBuffers.clear(); // RELEASE_REF
			_sendRegistered.store(false);
		}
	}
}

내용이 기니, 먼저 1부 : 보낼 데이터를 SendEvent 에 등록하는 파트부터 보자.

먼저, sendQueue 로부터 SendBuffer 를 꺼내 sendEvent 에 있는 sendBuffers 에 넣는다. 아, 참고로 이 sendBuffers 는 SendEvent 에 정의된 벡터이다.

class SendEvent : public IocpEvent
{
public:
	SendEvent() : IocpEvent(EventType::Send) { }
	 
	Vector<SendBufferRef> sendBuffers;
};

그리고 나서, 2부에서는 scatter-gather 방식으로 WSABUF 들을 한 번에 wsaBufs 벡터에 넣어서, WSASend 함수를 통해 실제로 전송을 해 준다.

 

사실 여기까지하면 거의 대부분의 작업이 완료되었다! Session 에서 ProcessSend 함수는 다음과 같이 바뀌게 되는데...

void Session::ProcessSend(int32 numOfBytes)
{
	_sendEvent.owner = nullptr; // RELEASE_REF
	_sendEvent.sendBuffers.clear(); // RELEASE_REF

	if (numOfBytes == 0)
	{
		Disconnect(L"Send 0");
		return;
	}

	// 컨텐츠 코드에서 재정의
	OnSend(numOfBytes);

	WRITE_LOCK;
	if (_sendQueue.empty())
		_sendRegistered.store(false);
	else
		RegisterSend();
}

달라진 것은 _sendQueue 상태를 체크하는 것 정도가 아닐까 싶다 😅

 

참, 세션이 많아지면 기존처럼 일일히 세션을 만들지 않고, SessionManager 를 통해 관리하는 것이 좋다. 아래와 같이 기본 GameSession 과 이를 관리하는 SessionManager 를 추가해 보자.

GameSessionManager.h

class GameSession;

using GameSessionRef = shared_ptr<GameSession>;

class GameSessionManager
{
public:
	void Add(GameSessionRef session);
	void Remove(GameSessionRef session);
	void Broadcast(SendBufferRef sendBuffer);

private:
	USE_LOCK;
	Set<GameSessionRef> _sessions;
};

extern GameSessionManager GSessionManager;

 

GameSession.cpp

#include "pch.h"
#include "GameSessionManager.h"
#include "GameSession.h"

GameSessionManager GSessionManager;

void GameSessionManager::Add(GameSessionRef session)
{
	WRITE_LOCK;
	_sessions.insert(session);
}

void GameSessionManager::Remove(GameSessionRef session)
{
	WRITE_LOCK;
	_sessions.erase(session);
}

void GameSessionManager::Broadcast(SendBufferRef sendBuffer)
{
	WRITE_LOCK;
	for (GameSessionRef session : _sessions)
	{
		session->Send(sendBuffer);
	}
}

Broadcast 를 하면, 델리게이트에 등록된 함수들이 호출되듯, 각 세션에 대해 Send 동작이 진행될 것이다.

 

GameSession.h

class GameSession : public Session
{
public:
	~GameSession()
	{
		cout << "~GameSession" << endl;
	}

	virtual void OnConnected() override;
	virtual void OnDisconnected() override;
	virtual int32 OnRecv(BYTE* buffer, int32 len) override;
	virtual void OnSend(int32 len) override;
};

 

GameSession.cpp

#include "pch.h"
#include "GameSession.h"
#include "GameSessionManager.h"

void GameSession::OnConnected()
{
	GSessionManager.Add(static_pointer_cast<GameSession>(shared_from_this()));
}

void GameSession::OnDisconnected()
{
	GSessionManager.Remove(static_pointer_cast<GameSession>(shared_from_this()));
}

int32 GameSession::OnRecv(BYTE* buffer, int32 len)
{
	// Echo
	cout << "OnRecv Len = " << len << endl;

	SendBufferRef sendBuffer = MakeShared<SendBuffer>(4096);
	sendBuffer->CopyData(buffer, len);
	
	GSessionManager.Broadcast(sendBuffer);

	return len;
}

void GameSession::OnSend(int32 len)
{
	cout << "OnSend Len = " << len << endl;
}

이제 GameSession 은 GameSessionManager 를 통해 관리될 것이며, OnRecv 함수가 호출될 때, sendBuffer 에 가 넘어가 각 Session 별로 Send 가 진행될 것이다. 뭐, 그냥 Echo 서버처럼 계속 주고 받고 있는 느낌으로 동작한다고 보면 된다.

참고로 아래가 그 결과물인데...

왼쪽이 클라, 오른쪽이 서버다. 자세히 보면, 서버의 경우 OnRecv 를 한 번 받을 때 OnSend 가 5 번정도 불리는 것을 알 수 있다. 이는 위 구현처럼, OnRecv 를 한 번 받을 때, GSessionManager 를 통해 Broadcast 가 일어나서 5개의 서비스가 돌아가기 때문임을 유추할 수 있다. 5 개의 쓰레드가, 1개의 서비스를 물고 있으면서, 각각 Dispatch(각 IocpCore 의 구체적인 구현에 맞는 행위)를 하기 때문이다.

(서비스가 5개인 줄 알고 적었는데, 실제 구현은 그렇지 않다는 것을 보고 수정함)

for (int32 i = 0; i < 5; i++)
{
    GThreadManager->Launch([=]()
        {
            while (true)
            {
                service->GetIocpCore()->Dispatch();
            }				
        });
}

위가 바로 GameServer 내의 코드이다. 여기서 5 를 10 으로 바꾸면... OnSend 가 OnRecv 당 10개씩 등장하게 된다 🤣

Comments