콘텐츠로 이동

C++ Thread Pool 구현

스레드 생성 비용은 생각보다 큽니다. std::thread 하나를 생성하면 OS 커널 객체 할당, 스택 메모리 예약(기본 1~8 MB), 스케줄러 등록이 일어납니다. 짧은 작업을 수천 번 수행할 때마다 스레드를 생성·소멸하면 오버헤드가 실제 작업보다 커집니다.

Thread Pool은 스레드를 미리 생성해두고 작업을 큐에 넣으면 유휴 스레드가 꺼내 실행하는 패턴입니다.


#pragma once
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <future>
#include <stdexcept>
class ThreadPool
{
public:
explicit ThreadPool(size_t numThreads)
: _stop(false)
{
for (size_t i = 0; i < numThreads; ++i)
{
_workers.emplace_back([this]
{
for (;;)
{
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(_mutex);
// 작업이 있거나 종료 요청이 올 때까지 대기
_cv.wait(lock, [this]
{
return _stop || !_tasks.empty();
});
// 종료 요청 + 큐가 비었으면 스레드 종료
if (_stop && _tasks.empty())
return;
task = std::move(_tasks.front());
_tasks.pop();
}
task(); // 잠금 해제 후 실행
}
});
}
}
// 작업 제출: future<T> 반환
template<typename F, typename... Args>
auto submit(F&& f, Args&&... args)
-> std::future<std::invoke_result_t<F, Args...>>
{
using ReturnType = std::invoke_result_t<F, Args...>;
// packaged_task로 감싸서 future 연결
auto task = std::make_shared<std::packaged_task<ReturnType()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<ReturnType> result = task->get_future();
{
std::unique_lock<std::mutex> lock(_mutex);
if (_stop)
throw std::runtime_error("submit on stopped ThreadPool");
_tasks.emplace([task]{ (*task)(); });
}
_cv.notify_one(); // 대기 중인 워커 하나 깨우기
return result;
}
~ThreadPool()
{
{
std::unique_lock<std::mutex> lock(_mutex);
_stop = true;
}
_cv.notify_all(); // 모든 워커 깨워서 종료 처리
for (auto& worker : _workers)
worker.join();
}
// 복사·이동 금지
ThreadPool(const ThreadPool&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;
private:
std::vector<std::thread> _workers;
std::queue<std::function<void()>> _tasks;
std::mutex _mutex;
std::condition_variable _cv;
bool _stop;
};

#include "ThreadPool.h"
#include <iostream>
#include <chrono>
int main()
{
ThreadPool pool(4); // 워커 스레드 4개
// 반환값 없는 작업
pool.submit([]
{
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::cout << "Task 1 done\n";
});
// 반환값 있는 작업: future로 결과 수신
auto future1 = pool.submit([](int a, int b) { return a + b; }, 3, 4);
auto future2 = pool.submit([](double x) { return x * x; }, 3.14);
std::cout << "Sum: " << future1.get() << "\n"; // 7
std::cout << "Square: " << future2.get() << "\n"; // 9.8596
}

packaged_task는 작업 내부에서 발생한 예외를 캡처해 future::get() 호출 시 재전파합니다.

auto future = pool.submit([]() -> int
{
throw std::runtime_error("작업 중 오류 발생");
return 42;
});
try
{
int result = future.get(); // 여기서 예외 재전파
}
catch (const std::runtime_error& e)
{
std::cerr << "오류: " << e.what() << "\n";
}

작업에 우선순위를 부여하려면 std::priority_queue로 큐를 교체합니다.

struct PriorityTask
{
int priority;
std::function<void()> task;
bool operator<(const PriorityTask& other) const
{
return priority < other.priority; // 높은 priority가 먼저
}
};
// 기존 _tasks 교체
std::priority_queue<PriorityTask> _tasks;
// 제출 시 priority 지정
template<typename F>
void submitPriority(F&& f, int priority)
{
std::unique_lock lock(_mutex);
_tasks.push({ priority, std::forward<F>(f) });
_cv.notify_one();
}

std::stop_token(C++20)을 활용하면 진행 중인 작업에 취소 신호를 보낼 수 있습니다.

#include <stop_token>
// 취소 가능한 작업 제출
std::stop_source stopSrc;
pool.submit([stopToken = stopSrc.get_token()]
{
for (int i = 0; i < 1000; ++i)
{
if (stopToken.stop_requested())
{
std::cout << "작업 취소됨\n";
return;
}
// 실제 작업 ...
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
});
// 500ms 후 취소 요청
std::this_thread::sleep_for(std::chrono::milliseconds(500));
stopSrc.request_stop();

모든 제출한 작업이 완료될 때까지 기다리려면 std::latch(C++20)를 사용합니다.

#include <latch>
const int taskCount = 10;
std::latch done(taskCount);
for (int i = 0; i < taskCount; ++i)
{
pool.submit([i, &done]
{
std::cout << "Task " << i << " running\n";
done.count_down();
});
}
done.wait(); // 모든 작업 완료 대기
std::cout << "모든 작업 완료\n";

// CPU 바운드 작업: 논리 코어 수
size_t cpuBound = std::thread::hardware_concurrency();
// I/O 바운드 작업: 코어 수 × 2~4 (블로킹 대기 시간 활용)
size_t ioBound = std::thread::hardware_concurrency() * 4;

워커 스레드가 공유하는 카운터를 캐시 라인 단위로 정렬합니다.

struct alignas(64) WorkerStats // 캐시 라인 크기에 맞춤
{
std::atomic<uint64_t> processed{0};
char padding[64 - sizeof(std::atomic<uint64_t>)];
};
std::vector<WorkerStats> _stats; // 워커별 통계

구성 요소역할
std::thread 벡터워커 스레드 풀
std::queue<function<void()>>작업 큐
std::mutex + condition_variable큐 동기화
std::packaged_task + future결과 반환 및 예외 전파
stop_token취소 신호 전달

Thread Pool은 고성능 서버, 게임 엔진 작업 스케줄러, 파일 I/O 병렬화 등 다양한 곳에 쓰입니다. 표준 라이브러리만으로도 충분히 견고한 구현이 가능합니다.