C++ Thread Pool 구현
Thread Pool이 필요한 이유
섹션 제목: “Thread Pool이 필요한 이유”스레드 생성 비용은 생각보다 큽니다. std::thread 하나를 생성하면 OS 커널 객체 할당, 스택 메모리 예약(기본 1~8 MB), 스케줄러 등록이 일어납니다. 짧은 작업을 수천 번 수행할 때마다 스레드를 생성·소멸하면 오버헤드가 실제 작업보다 커집니다.
Thread Pool은 스레드를 미리 생성해두고 작업을 큐에 넣으면 유휴 스레드가 꺼내 실행하는 패턴입니다.
기본 Thread Pool 구현
섹션 제목: “기본 Thread Pool 구현”#pragma once#include <vector>#include <queue>#include <thread>#include <mutex>#include <condition_variable>#include <functional>#include <future>#include <stdexcept>
class ThreadPool{public: explicit ThreadPool(size_t numThreads) : _stop(false) { for (size_t i = 0; i < numThreads; ++i) { _workers.emplace_back([this] { for (;;) { std::function<void()> task; { std::unique_lock<std::mutex> lock(_mutex);
// 작업이 있거나 종료 요청이 올 때까지 대기 _cv.wait(lock, [this] { return _stop || !_tasks.empty(); });
// 종료 요청 + 큐가 비었으면 스레드 종료 if (_stop && _tasks.empty()) return;
task = std::move(_tasks.front()); _tasks.pop(); }
task(); // 잠금 해제 후 실행 } }); } }
// 작업 제출: future<T> 반환 template<typename F, typename... Args> auto submit(F&& f, Args&&... args) -> std::future<std::invoke_result_t<F, Args...>> { using ReturnType = std::invoke_result_t<F, Args...>;
// packaged_task로 감싸서 future 연결 auto task = std::make_shared<std::packaged_task<ReturnType()>>( std::bind(std::forward<F>(f), std::forward<Args>(args)...) );
std::future<ReturnType> result = task->get_future();
{ std::unique_lock<std::mutex> lock(_mutex);
if (_stop) throw std::runtime_error("submit on stopped ThreadPool");
_tasks.emplace([task]{ (*task)(); }); }
_cv.notify_one(); // 대기 중인 워커 하나 깨우기 return result; }
~ThreadPool() { { std::unique_lock<std::mutex> lock(_mutex); _stop = true; }
_cv.notify_all(); // 모든 워커 깨워서 종료 처리
for (auto& worker : _workers) worker.join(); }
// 복사·이동 금지 ThreadPool(const ThreadPool&) = delete; ThreadPool& operator=(const ThreadPool&) = delete;
private: std::vector<std::thread> _workers; std::queue<std::function<void()>> _tasks; std::mutex _mutex; std::condition_variable _cv; bool _stop;};사용 예시
섹션 제목: “사용 예시”#include "ThreadPool.h"#include <iostream>#include <chrono>
int main(){ ThreadPool pool(4); // 워커 스레드 4개
// 반환값 없는 작업 pool.submit([] { std::this_thread::sleep_for(std::chrono::milliseconds(100)); std::cout << "Task 1 done\n"; });
// 반환값 있는 작업: future로 결과 수신 auto future1 = pool.submit([](int a, int b) { return a + b; }, 3, 4); auto future2 = pool.submit([](double x) { return x * x; }, 3.14);
std::cout << "Sum: " << future1.get() << "\n"; // 7 std::cout << "Square: " << future2.get() << "\n"; // 9.8596}예외 처리
섹션 제목: “예외 처리”packaged_task는 작업 내부에서 발생한 예외를 캡처해 future::get() 호출 시 재전파합니다.
auto future = pool.submit([]() -> int{ throw std::runtime_error("작업 중 오류 발생"); return 42;});
try{ int result = future.get(); // 여기서 예외 재전파}catch (const std::runtime_error& e){ std::cerr << "오류: " << e.what() << "\n";}우선순위 큐 기반 확장
섹션 제목: “우선순위 큐 기반 확장”작업에 우선순위를 부여하려면 std::priority_queue로 큐를 교체합니다.
struct PriorityTask{ int priority; std::function<void()> task;
bool operator<(const PriorityTask& other) const { return priority < other.priority; // 높은 priority가 먼저 }};
// 기존 _tasks 교체std::priority_queue<PriorityTask> _tasks;
// 제출 시 priority 지정template<typename F>void submitPriority(F&& f, int priority){ std::unique_lock lock(_mutex); _tasks.push({ priority, std::forward<F>(f) }); _cv.notify_one();}작업 취소 지원
섹션 제목: “작업 취소 지원”std::stop_token(C++20)을 활용하면 진행 중인 작업에 취소 신호를 보낼 수 있습니다.
#include <stop_token>
// 취소 가능한 작업 제출std::stop_source stopSrc;
pool.submit([stopToken = stopSrc.get_token()]{ for (int i = 0; i < 1000; ++i) { if (stopToken.stop_requested()) { std::cout << "작업 취소됨\n"; return; }
// 실제 작업 ... std::this_thread::sleep_for(std::chrono::milliseconds(1)); }});
// 500ms 후 취소 요청std::this_thread::sleep_for(std::chrono::milliseconds(500));stopSrc.request_stop();작업 완료 대기 (barrier 패턴)
섹션 제목: “작업 완료 대기 (barrier 패턴)”모든 제출한 작업이 완료될 때까지 기다리려면 std::latch(C++20)를 사용합니다.
#include <latch>
const int taskCount = 10;std::latch done(taskCount);
for (int i = 0; i < taskCount; ++i){ pool.submit([i, &done] { std::cout << "Task " << i << " running\n"; done.count_down(); });}
done.wait(); // 모든 작업 완료 대기std::cout << "모든 작업 완료\n";성능 튜닝 포인트
섹션 제목: “성능 튜닝 포인트”스레드 수 선택
섹션 제목: “스레드 수 선택”// CPU 바운드 작업: 논리 코어 수size_t cpuBound = std::thread::hardware_concurrency();
// I/O 바운드 작업: 코어 수 × 2~4 (블로킹 대기 시간 활용)size_t ioBound = std::thread::hardware_concurrency() * 4;거짓 공유 (False Sharing) 방지
섹션 제목: “거짓 공유 (False Sharing) 방지”워커 스레드가 공유하는 카운터를 캐시 라인 단위로 정렬합니다.
struct alignas(64) WorkerStats // 캐시 라인 크기에 맞춤{ std::atomic<uint64_t> processed{0}; char padding[64 - sizeof(std::atomic<uint64_t>)];};
std::vector<WorkerStats> _stats; // 워커별 통계| 구성 요소 | 역할 |
|---|---|
std::thread 벡터 | 워커 스레드 풀 |
std::queue<function<void()>> | 작업 큐 |
std::mutex + condition_variable | 큐 동기화 |
std::packaged_task + future | 결과 반환 및 예외 전파 |
stop_token | 취소 신호 전달 |
Thread Pool은 고성능 서버, 게임 엔진 작업 스케줄러, 파일 I/O 병렬화 등 다양한 곳에 쓰입니다. 표준 라이브러리만으로도 충분히 견고한 구현이 가능합니다.