System.Threading.Channels로 비동기 생산자-소비자 구현
System.Threading.Channels는 .NET Core 3.0에서 도입된 고성능 비동기 생산자-소비자 채널입니다. BlockingCollection<T>의 비동기 버전으로, async/await와 완벽하게 통합되며 백프레셔(backpressure) 처리를 내장합니다.
1. 기본 개념
섹션 제목: “1. 기본 개념”using System.Threading.Channels;
// 무한 크기 채널Channel<int> unbounded = Channel.CreateUnbounded<int>();
// 크기 제한 채널 (백프레셔 지원)Channel<int> bounded = Channel.CreateBounded<int>(capacity: 100);
// 채널은 두 종단으로 분리ChannelWriter<int> writer = bounded.Writer;ChannelReader<int> reader = bounded.Reader;2. 기본 사용 패턴
섹션 제목: “2. 기본 사용 패턴”var channel = Channel.CreateUnbounded<string>();
// 생산자async Task ProduceAsync(){ for (int i = 0; i < 10; i++) { await channel.Writer.WriteAsync($"item-{i}"); await Task.Delay(100); } channel.Writer.Complete(); // 쓰기 완료 신호}
// 소비자async Task ConsumeAsync(){ await foreach (var item in channel.Reader.ReadAllAsync()) { Console.WriteLine($"처리: {item}"); } // Writer.Complete() 호출 후 루프 종료}
await Task.WhenAll(ProduceAsync(), ConsumeAsync());3. BoundedChannel — 백프레셔
섹션 제목: “3. BoundedChannel — 백프레셔”var options = new BoundedChannelOptions(capacity: 10){ FullMode = BoundedChannelFullMode.Wait, // 꽉 차면 생산자 대기 // FullMode = BoundedChannelFullMode.DropOldest, // 오래된 항목 제거 // FullMode = BoundedChannelFullMode.DropNewest, // 새 항목 버림 // FullMode = BoundedChannelFullMode.DropWrite, // 현재 쓰기 실패 SingleReader = false, SingleWriter = false};
var channel = Channel.CreateBounded<WorkItem>(options);3.1 TryWrite vs WriteAsync
섹션 제목: “3.1 TryWrite vs WriteAsync”// TryWrite: 즉시 반환, 채널이 꽉 차면 falseif (channel.Writer.TryWrite(item)){ Console.WriteLine("쓰기 성공");}else{ Console.WriteLine("채널 포화 — 나중에 재시도");}
// WriteAsync: 공간이 생길 때까지 비동기 대기await channel.Writer.WriteAsync(item, cancellationToken);4. 다중 생산자/소비자
섹션 제목: “4. 다중 생산자/소비자”var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(50){ SingleReader = false, SingleWriter = false});
// 3개 생산자var producers = Enumerable.Range(0, 3).Select(async id =>{ for (int i = 0; i < 100; i++) { await channel.Writer.WriteAsync(id * 100 + i); }});
// 2개 소비자var consumers = Enumerable.Range(0, 2).Select(async id =>{ await foreach (var item in channel.Reader.ReadAllAsync()) { Console.WriteLine($"소비자 {id}: {item}"); }});
// 생산자 완료 후 채널 닫기await Task.WhenAll(producers);channel.Writer.Complete();
await Task.WhenAll(consumers);5. 파이프라인 구성
섹션 제목: “5. 파이프라인 구성”채널을 연결하여 처리 단계를 파이프라인으로 구성합니다.
async Task RunPipeline(CancellationToken ct){ // 단계 1: 데이터 수집 var rawChannel = Channel.CreateBounded<string>(100); // 단계 2: 파싱 var parsedChannel = Channel.CreateBounded<ParsedData>(50); // 단계 3: 저장 var saveChannel = Channel.CreateBounded<ProcessedData>(50);
// 각 단계를 병렬로 실행 var stage1 = FetchData(rawChannel.Writer, ct); var stage2 = ParseData(rawChannel.Reader, parsedChannel.Writer, ct); var stage3 = ProcessData(parsedChannel.Reader, saveChannel.Writer, ct); var stage4 = SaveData(saveChannel.Reader, ct);
await Task.WhenAll(stage1, stage2, stage3, stage4);}
async Task FetchData(ChannelWriter<string> writer, CancellationToken ct){ try { for (int i = 0; i < 1000; i++) { await writer.WriteAsync($"raw-{i}", ct); } } finally { writer.Complete(); }}
async Task ParseData( ChannelReader<string> reader, ChannelWriter<ParsedData> writer, CancellationToken ct){ try { await foreach (var raw in reader.ReadAllAsync(ct)) { var parsed = Parse(raw); // 동기 파싱 await writer.WriteAsync(parsed, ct); } } finally { writer.Complete(); }}6. 에러 처리
섹션 제목: “6. 에러 처리”var channel = Channel.CreateUnbounded<int>();
async Task ProduceWithError(){ try { for (int i = 0; i < 100; i++) { if (i == 50) throw new InvalidOperationException("생산 오류"); await channel.Writer.WriteAsync(i); } channel.Writer.Complete(); } catch (Exception ex) { // 오류로 채널 완료 → 소비자가 예외를 받음 channel.Writer.Complete(ex); }}
async Task ConsumeWithError(){ try { await foreach (var item in channel.Reader.ReadAllAsync()) { // 처리 } } catch (ChannelClosedException ex) { Console.WriteLine($"채널 오류로 종료: {ex.InnerException?.Message}"); }}7. BlockingCollection과 비교
섹션 제목: “7. BlockingCollection과 비교”| 항목 | BlockingCollection<T> | Channel<T> |
|---|---|---|
| 비동기 | X | O |
| async/await | X | O |
| 백프레셔 | O | O |
| 성능 | 보통 | 높음 |
| .NET 버전 | .NET Framework 4.0+ | .NET Core 3.0+ |
Channel.CreateUnbounded<T>(): 제한 없는 채널Channel.CreateBounded<T>(N): 백프레셔 내장 채널ReadAllAsync():IAsyncEnumerable로 소비Writer.Complete(ex): 오류와 함께 채널 종료- 다중 단계 파이프라인 구성에 최적화