콘텐츠로 이동

C++20 std::latch와 std::barrier로 스레드 동기화

C++20은 스레드 간 동기화를 위한 두 가지 새로운 동기화 원시 타입을 추가했습니다. std::latch는 일회성 카운트다운 장벽이고, std::barrier는 재사용 가능한 단계별 동기화 장벽입니다. 두 타입 모두 <latch><barrier> 헤더에 정의됩니다.


std::latch는 내부 카운터를 가진 일회성 장벽입니다. 카운터가 0이 되면 대기 중인 모든 스레드가 해제됩니다. 한 번 0이 되면 재사용할 수 없습니다.

#include <latch>
#include <thread>
#include <vector>
#include <iostream>
int main() {
const int numThreads = 4;
std::latch ready(numThreads); // 카운터 = 4
std::vector<std::thread> workers;
for (int i = 0; i < numThreads; ++i) {
workers.emplace_back([&, i] {
// 초기화 작업
std::cout << "스레드 " << i << " 준비 완료\n";
ready.count_down(); // 카운터 -1
// 모든 스레드가 준비될 때까지 대기
ready.wait(); // 카운터가 0이면 즉시 반환
std::cout << "스레드 " << i << " 작업 시작\n";
});
}
for (auto& t : workers) t.join();
}
std::latch latch(N); // 카운터를 N으로 초기화
latch.count_down(n); // 카운터를 n만큼 감소 (기본 1)
latch.wait(); // 카운터가 0이 될 때까지 블로킹
latch.try_wait(); // 즉시 반환: 카운터가 0이면 true
latch.arrive_and_wait(); // count_down + wait 결합
std::latch startLine(1); // 시작 신호를 위한 카운터
void worker(std::latch& start) {
start.wait(); // 시작 신호 대기
// 모든 스레드가 동시에 시작
}
std::thread t1(worker, std::ref(startLine));
std::thread t2(worker, std::ref(startLine));
std::thread t3(worker, std::ref(startLine));
std::this_thread::sleep_for(std::chrono::milliseconds(100));
startLine.count_down(); // 모든 스레드 동시 해제

std::barrier는 재사용 가능한 장벽입니다. 모든 참가 스레드가 장벽에 도달하면 **단계(phase)**가 완료되고, 다음 단계로 진입합니다.

#include <barrier>
#include <thread>
#include <vector>
const int numThreads = 3;
std::barrier sync(numThreads); // 3개 스레드 참가
void worker(int id) {
for (int phase = 0; phase < 3; ++phase) {
// 단계별 작업
std::cout << "스레드 " << id << " 단계 " << phase << " 시작\n";
std::this_thread::sleep_for(std::chrono::milliseconds(10 * id));
// 모든 스레드가 이 지점에 도달할 때까지 대기
sync.arrive_and_wait();
std::cout << "스레드 " << id << " 단계 " << phase << " 완료\n";
}
}

모든 스레드가 장벽에 도달했을 때 단 한 번 호출되는 함수를 지정할 수 있습니다.

std::vector<int> results(numThreads);
auto onPhaseComplete = [&]() noexcept {
// 단계 완료 시 집계 처리
int total = 0;
for (int r : results) total += r;
std::cout << "단계 결과 합계: " << total << "\n";
};
std::barrier sync(numThreads, onPhaseComplete);
void worker(int id) {
for (int phase = 0; phase < 3; ++phase) {
results[id] = id * (phase + 1);
sync.arrive_and_wait(); // 완료 함수는 마지막 도착 스레드가 호출
}
}
std::barrier b(numThreads);
// arrive(): 도착 신호만 보내고 계속 진행 (블로킹 없음)
// 반환값: arrival_token (나중에 wait에 사용)
auto token = b.arrive();
// 다른 작업 수행 가능...
// wait(token): 해당 단계 완료 대기
b.wait(std::move(token));
// arrive_and_wait(): arrive + wait 결합 (가장 일반적)
b.arrive_and_wait();
// arrive_and_drop(): 참가자 수 감소 (이후 단계에서 이 스레드 제외)
b.arrive_and_drop();

특성std::latchstd::barrier
재사용X (일회성)O (단계별 반복)
카운터 감소여러 번 가능단계마다 자동 리셋
완료 함수XO
비대칭 count_downOX

4. 실전 활용 — 병렬 데이터 처리

섹션 제목: “4. 실전 활용 — 병렬 데이터 처리”
#include <barrier>
#include <vector>
#include <numeric>
#include <thread>
void parallelSum(const std::vector<int>& data) {
const int N = 4;
std::vector<int> partialSums(N, 0);
int totalSum = 0;
auto onReduce = [&]() noexcept {
totalSum = 0;
for (int s : partialSums) totalSum += s;
std::cout << "부분합 집계: " << totalSum << "\n";
};
std::barrier barrier(N, onReduce);
std::vector<std::thread> threads;
for (int i = 0; i < N; ++i) {
threads.emplace_back([&, i] {
// 각 스레드가 데이터 구간 처리
int start = i * (data.size() / N);
int end = (i + 1) * (data.size() / N);
partialSums[i] = std::accumulate(
data.begin() + start,
data.begin() + end, 0);
barrier.arrive_and_wait(); // 모든 스레드 완료 대기
// 집계 결과 사용
if (i == 0) {
std::cout << "최종 합계: " << totalSum << "\n";
}
});
}
for (auto& t : threads) t.join();
}

const int WORKERS = 3;
const int PHASES = 4;
std::barrier pipeline(WORKERS, []() noexcept {
// 단계 완료 시 호출
});
struct WorkItem {
std::vector<float> data;
float result = 0.f;
};
std::vector<WorkItem> items(WORKERS);
void pipelineWorker(int id) {
// 단계 1: 데이터 로드
items[id].data = {1.f, 2.f, 3.f};
pipeline.arrive_and_wait();
// 단계 2: 처리
float sum = 0.f;
for (float v : items[id].data) sum += v;
items[id].result = sum;
pipeline.arrive_and_wait();
// 단계 3: 검증
if (items[id].result <= 0.f) {
std::cerr << "워커 " << id << " 결과 이상\n";
}
pipeline.arrive_and_wait();
// 단계 4: 출력
std::cout << "워커 " << id << " 결과: " << items[id].result << "\n";
pipeline.arrive_and_wait();
}

class AsyncInitializer {
std::latch ready_{1};
std::thread worker_;
public:
AsyncInitializer() {
worker_ = std::thread([this] {
// 비용이 큰 초기화 작업
std::this_thread::sleep_for(std::chrono::milliseconds(100));
ready_.count_down(); // 완료 신호
});
}
~AsyncInitializer() {
if (worker_.joinable()) worker_.join();
}
void waitReady() {
ready_.wait(); // 초기화 완료까지 블로킹
}
};
AsyncInitializer init;
// ... 다른 초기화 작업 ...
init.waitReady(); // 비동기 초기화 완료 대기

  • std::latch: 일회성 카운트다운 장벽. “N개 작업 모두 완료 후 진행”에 적합
  • std::barrier: 재사용 가능한 단계 장벽. “N개 스레드 동기화를 반복적으로 수행”에 적합
  • 완료 함수를 통해 단계 전환 시점의 집계·상태 갱신을 안전하게 처리
  • arrive_and_drop()으로 참가 스레드 수를 동적으로 줄일 수 있음