KoreanFoodie's Study

[C++ 함수형 프로그래밍] 퍼포먼스 최적화(메모이제이션과 꼬리 재귀, 병렬 연산) 본문

R & D/Software Engineering

[C++ 함수형 프로그래밍] 퍼포먼스 최적화(메모이제이션과 꼬리 재귀, 병렬 연산)

GoldGiver 2023. 9. 28. 19:41

함수형 프로그래밍 패러다임에 대해 알아보며 이를 C++ 를 이용한 소프트웨어 개발에 어떻게 적용하면 좋을지 알아보겠습니다.

핵심 :

1. 메모이제이션을 활용하면, 재귀적인 연산에서 이전 연산의 캐싱값을 이용해 불필요한 연산을 줄일 수 있다. 

2. 꼬리 재귀 최적화는 컴파일러 최적화 플래그를 활성화하면 된다. 이를 통해 메모리 사용을 낮출 수 있다.

3. 순수 함수의 불변성을 이용해 병렬 연산을 쉽게 적용할 수 있다. 하지만 future 등을 사용하여 병렬 연산을 하는 것보다, sequential 한 방식의 성능이 더 나을 때도 있다.

퍼포먼스 최적화

사실 모든 경우와 상황에 알맞은 최적화 방법이란 없다. 어떤 경우는 메모리를 희생해서라도 속도를 높여야 할 수도 있고, 다른 경우는 메모리를 아끼기 위해 빠른 속도를 어느 정도 포기해야 할 수도 있다.

그럼에도 불구하고, 대표적인 기법인 꼬리 재귀와 메모이제이션을 알아두면 추후 함수형 프로그램의 최적화에 큰 도움이 될 것이다.

다만 책에서 소개한 병렬론 및 멀티 쓰레드 기법은 사실 함수형 프로그래밍에만 적용되는 내용은 아니기에 그리 깊이 다루지는 않겠다.

특정 함수의 처리 시간을 계산할 때, 아래의 함수를 이용하겠다(chrono 네임스페이스 참고).

auto measureExecutionTimeForF = [](auto f)
{
	auto t1 = high_resolution_clock::now();
	f();
	auto t2 = high_resolution_clock::now();
	chrono::nanoseconds duration = t2 - t1;
	cout << "Duration : " << duration.count() << "ns" << endl;
	return duration;
};

참고로, Variadic Template 을 사용하면 여러 인자를 받는 함수도 시간 측정이 가능하다.

더보기
template<typename ReturnType, typename... Args>
auto ElapsedTimeOfFunction(function<ReturnType(Args...)> f)
{
	return[=](Args... args)
	{
		auto t1 = high_resolution_clock::now();
		f(args...);
		auto t2 = high_resolution_clock::now();
		chrono::nanoseconds duration = t2 - t1;
		cout << "\n Duration : " << duration.count() << "ns" << endl;
		return duration;
	};
};


/***/

// 일반
ElapsedTimeOfFunction(fact)(100);

// 메모이제이션
auto factMemoized = memoize(fact);
ElapsedTimeOfFunction(factMemoized)(100);

 

 

메모이제이션

사실 메모이제이션이라고 하면, 흔히 DP 에서 많이 사용했던 기억이 있을지도 모르겠다. 여기서는 팩토리얼을 예제로 하여, 캐싱된 값을 맵에 저장함으로써 메모이제이션을 테스트해 볼 것이다.

먼저 팩토리얼 함수는 다음과 같이 쓸 수 있다.

// 전역에서 선언하면 컴파일 에러 발생에 유의
function<int(int)> fact = [&fact](auto N)
{
    if (N == 0) return 1;
    else return N * fact(N - 1);
};

 

그리고 Variadic Template 의 힘을 빌려, 특정 함수를 메모이제이션 가능한 버전으로 만드는 memoize 함수를 만들 것이다.

template<typename ReturnType, typename... Args>
function<ReturnType(Args...)> memoize(function<ReturnType(Args...)> f)
{
	map<tuple<Args...>, ReturnType> cache;
	return ([=](Args...args) mutable
		{
			tuple<Args...> theArguments(args...);
			auto cached = cache.find(theArguments);
			if (cached != cache.end())
			{
				return cached->second;
			}
			else
			{
				auto result = f(args...);
				cache[theArguments] = result;
				return result;
			}
		});
}

그런데 위의 함수를 잘 보면... 최종 결과값만 캐싱이 되는 것을 확인할 수 있다. 사실 그런데 사실 우리가 원하는 건 중간 단계의 결과가 캐싱되는 것이다 🤨

실제로 위의 코드를 돌려 일반 팩토리얼과 메모이제이션이 적용된 팩토리얼의 성능을 비교하면...

// 일반
auto fact100 = bind(fact, 100);
measureExecutionTimeForF(fact100);

// 메모이제이션
auto factMemoized = memoize(fact);
auto fact100Memoized = bind(factMemoized, 100);
measureExecutionTimeForF(fact100Memoized);

라고 결과가 나오지만, 사실 위 수치는 메모이제이션 때문에 시간이 적게 걸린 것이 아니라, CPU 자체 최적화 때문에 그런 것이다. 실제로 메모이제이션을 먼저 호출해 보면 다음과 같은 결과가 나온다 :

이제는 메모이제이션된 버전이 더 끔찍하게 느리게 나왔다. 😕 다만, 같은 함수를 여러번 호출하면 더 나은 결과를 보일 순 있을 것이다.

// 메모이제이션
auto factMemoized = memoize(fact);
ElapsedTimeOfFunction(factMemoized)(100);
ElapsedTimeOfFunction(factMemoized)(100);
ElapsedTimeOfFunction(factMemoized)(100);
ElapsedTimeOfFunction(factMemoized)(100);
ElapsedTimeOfFunction(factMemoized)(100);

// 일반
ElapsedTimeOfFunction(fact)(143);

실제로 아래와 같은 결과가 나온다.

이제 각 호출에 캐시가 적용되도록 팩토리얼 함수를 한 번 변경해 보자.

map<int, int> cache;
function<int(int)> recursiveMemoizedFactorial =
    [&recursiveMemoizedFactorial, &cache](int n) mutable
{
    auto value = cache.find(n);
    if (value != cache.end()) return value->second;

    int result;
    if (n == 0)
        result = 1;
    else
        result = n * recursiveMemoizedFactorial(n - 1);

    cache[n] = result;
    return result;
};

/***/

// 재귀 메모이제이션 팩토리얼
ElapsedTimeOfFunction(recursiveMemoizedFactorial)(105);
ElapsedTimeOfFunction(recursiveMemoizedFactorial)(106);
ElapsedTimeOfFunction(recursiveMemoizedFactorial)(107);
ElapsedTimeOfFunction(recursiveMemoizedFactorial)(108);
ElapsedTimeOfFunction(recursiveMemoizedFactorial)(109);

이 결과를 테스트해보면...

오! 적어도 맨 마지막에는 유의미한 결과가 나왔음을 확인할 수 있다. 😇

 

 

꼬리 재귀 최적화

사실 명령형 프로그램에서 재귀적으로 코드를 짜는 경우는 생각보다 많지 않다. 대부분 for-loop 을 선호하기도 하고, 과도한 재귀 호출은 스택 오버플로우를 일으킬 수 있다.

위에서 예로 들었던 팩토리얼 함수도, 마찬가지로 호출될 때마다 스택을 차지하는데, 올바른 최적화 플래그를 활성화하면 컴파일러는 그것을 미리 예측하고 필요한 메모리 양을 줄여준다.

// 최적화 버전에서, fact 는 다음과 같이 해석된다
function<int(int)> fact = [](auto N)
{
	if (N == 0) return 1;
	else return N * (N - 1) * (N - 2) * ... * fact(0);
};

gcc 에서, 이 옵션은 -foptimize-sibling-calls 이다. 참고로, -03 옵션을 주면 모든 최적화 플래그를 활성화할 수 있다!

실제로 테스트를 해 보면, 이 옵션을 주었는지 안 주었는지가 매우 큰 차이를 보일 때도 있다는 것을 알 수 있다 😊

 

 

기타 최적화 묶음

병렬론 - 불변성의 장점 활용하기

우리는 여러 쓰레드에게 일을 맡길 때, 보통 Lock 을 만드는 방식으로 공유 자원을 사용했을 때 일어날 수 있는 문제를 방지한다.

그런데 사실 함수형 프로그래밍의 빌딩 블록은 순수 함수이며, 순수 함수는 입력 파라미터를 변형시키지 않는다는 것을 알고 있다. 이를 통해, 우리는 순수함수를 안전하게 병렬적으로 실행할 수 있음을 깨달을 수 있다.

#include <execution>
#include <algorithm>

#define PARALLEL_ENABLED

// vec = { 1, 2, 3... 100 }
vector<int> vec = toRange(100);

auto all_of_sequential = [&vec]()
{
    return all_of(execution::seq, vec.begin(), vec.end(), [](auto value)
        {
            return value > 5;
        });
};

measureExecutionTimeForF(all_of_sequential);

auto all_of_parallel = [&vec]()
{
    return all_of(execution::par, vec.begin(), vec.end(), [](auto value)
        {
            return value > 5;
        });
};

measureExecutionTimeForF(all_of_parallel);

위를 보면, execution::par 를 붙인 녀석이 병렬 실행하는 녀석이다. 아쉽게도, 이 경우에서는 병렬실행이 더 느린 결과를 보여준다 😅

이는 사실 CPU 와 컴파일러가 자체적으로 최적화를 하기 때문인데, 때로는 코드상에서 최적화를 하는 것보다 CPU 와 컴파일러에게 최적화를 맡기는 것이 더 나을 때도 있다! 😵

 

비동기 코드를 통한 실행 시간 최적화

책에서는 사실 future 를 사용해서 간단한 병렬 처리 예제를 보여준다. 이는 오히려 서버 관련 포스팅에서 자세히 알아보는 것이 더 좋아 보이긴 한다 😂

다만 책에서 future 를 벡터에 담아 한번에 병렬적으로 결과값을 처리하는 코드 스니펫 정도는 참고하고 넘어가도록 하겠다 😊

// 소수 판별을 병렬적으로 수행할 것이다
auto is_prime = [](int x)
{
	auto xIsDivisibleBy = bind(isDivisibleBy, x, _1);
	return none_of_collection(rangeFrom2To(x - 1), xIsDivisibleBy);
};

// async 하게 만들어야 별도의 쓰레드에서 수행된다
auto makeFuture = [](auto value)
{
	return async(is_prime, value);
};

vector<int> values = { 123, 1412, 944, 9209 };
vector<future<bool>> futures = transformAll<vector<future<bool>>>(values, makeFuture);

// future 객체는 복사되어서는 안된다는 것에 유의
vector<bool> results(values.size());
transform(futures.begin(), futures.end(), results.begin(), []
(future<bool>& future) { return future.get(); });

 

리액티브 프로그래밍 맛보기

리액티브 프로그래밍은 데이터 스트림을 처리하는데 중점을 두는 코드를 작성하는 패러다임이다. 예를 들어, 자율주행의 경우 계속 새로운 데이터가 들어오므로, 지금까지 처리한 데이터와 현재 들어오는 데이터가 모두 정상적으로 처리되어야 한다.

간단히 생각하면, 기존에 데이터는 메인 스트림에서 처리하고, 새로 들어오는 데이터는 보조 스트림에게 맞기는 방식으로 처리하면 된다. 예시를 보면 한번에 이해가 될 것이다.

// 새로운 입력이 계속 들어온다
int number;
while (true)
{
	cin >> number;
	async(is_prime, number);
}

메인 스레드의 응답성은 계속 유지하되, 실제 연산은 자식 스레드가 수행할 것이다 😎

 

메모리 사용 최적화

우리는 이전까지 transform 을 사용하면서 컬렉션을 변형해 왔다. 마지막으로 move semantics 를 이용해 in-place 한 방식으로 transform 을 구현한 스니펫을 보고 이 글을 마무리하도록 하자!

// 기존 컬렉션 변형
auto transformAllInPlace = [](const auto& source, auto fn)
{
	transform(source.begin(), source.end(), fn);
};

// 이런 식으로도 표현할 수 있다
template<typename SourceType>
auto transformAllInPlace = [](const auto& source, fn) -> SourceType&&
{
	transform(source.begin(), source.end(), source.begin(), fn);
	return move(source);
}
Comments