System.Threading.Channels 고성능 데이터 파이프라인
System.Threading.Channels는 .NET Core 3.0부터 제공되는 스레드 안전한 비동기 데이터 채널 라이브러리입니다. BlockingCollection<T>의 비동기 버전으로, async/await와 완벽하게 통합되어 고성능 생산자-소비자 파이프라인을 구축할 수 있습니다.
1. 기본 개념
섹션 제목: “1. 기본 개념”생산자 (Producer) ↓ WriteAsync Channel<T> ↓ ReadAsync소비자 (Consumer)채널에는 두 가지 끝점이 있습니다.
ChannelWriter<T>: 데이터를 채널에 씁니다.ChannelReader<T>: 채널에서 데이터를 읽습니다.
2. 기본 사용법
섹션 제목: “2. 기본 사용법”using System.Threading.Channels;
// Unbounded 채널 (크기 제한 없음)var channel = Channel.CreateUnbounded<int>();
ChannelWriter<int> writer = channel.Writer;ChannelReader<int> reader = channel.Reader;
// 생산자async Task ProduceAsync(){ for (int i = 0; i < 10; i++) { await writer.WriteAsync(i); Console.WriteLine($"Produced: {i}"); } writer.Complete(); // 완료 신호}
// 소비자async Task ConsumeAsync(){ await foreach (int item in reader.ReadAllAsync()) { Console.WriteLine($"Consumed: {item}"); }}
await Task.WhenAll(ProduceAsync(), ConsumeAsync());3. BoundedChannel — 백프레셔
섹션 제목: “3. BoundedChannel — 백프레셔”// 최대 100개 항목만 버퍼링var options = new BoundedChannelOptions(capacity: 100){ FullMode = BoundedChannelFullMode.Wait, // 꽉 차면 생산자 대기 // FullMode = BoundedChannelFullMode.DropOldest, // 오래된 항목 삭제 // FullMode = BoundedChannelFullMode.DropNewest, // 새 항목 삭제 // FullMode = BoundedChannelFullMode.DropWrite, // 쓰기 실패 (TryWrite 반환 false) SingleReader = false, // 복수 소비자 SingleWriter = false, // 복수 생산자};
var channel = Channel.CreateBounded<WorkItem>(options);4. 실전: 이미지 처리 파이프라인
섹션 제목: “4. 실전: 이미지 처리 파이프라인”using System.Threading.Channels;
record ImageTask(string Path, int Width, int Height);record ResizedImage(string Path, byte[] Data);record SavedResult(string OutputPath, bool Success);
class ImagePipeline{ private readonly Channel<ImageTask> _loadChannel; private readonly Channel<ResizedImage> _resizeChannel; private readonly Channel<SavedResult> _saveChannel;
public ImagePipeline() { var opts = new BoundedChannelOptions(50) { FullMode = BoundedChannelFullMode.Wait }; _loadChannel = Channel.CreateBounded<ImageTask>(opts); _resizeChannel = Channel.CreateBounded<ResizedImage>(opts); _saveChannel = Channel.CreateBounded<SavedResult>(opts); }
public async Task RunAsync(IEnumerable<string> imagePaths, CancellationToken ct) { // 3단계 파이프라인을 병렬로 실행 var stageLoad = LoadStageAsync(imagePaths, ct); var stageResize = ResizeStageAsync(ct); var stageSave = SaveStageAsync(ct);
await Task.WhenAll(stageLoad, stageResize, stageSave); }
private async Task LoadStageAsync(IEnumerable<string> paths, CancellationToken ct) { try { foreach (var path in paths) { ct.ThrowIfCancellationRequested(); // 실제 로드 로직 (예시) var task = new ImageTask(path, 1920, 1080); await _loadChannel.Writer.WriteAsync(task, ct); } } finally { _loadChannel.Writer.Complete(); } }
private async Task ResizeStageAsync(CancellationToken ct) { try { await foreach (var task in _loadChannel.Reader.ReadAllAsync(ct)) { // 실제 리사이즈 로직 (예시) var resized = new ResizedImage(task.Path, new byte[task.Width * task.Height * 3]); await _resizeChannel.Writer.WriteAsync(resized, ct); } } finally { _resizeChannel.Writer.Complete(); } }
private async Task SaveStageAsync(CancellationToken ct) { try { await foreach (var img in _resizeChannel.Reader.ReadAllAsync(ct)) { string output = img.Path.Replace(".jpg", "_thumb.jpg"); // 실제 저장 로직 (예시) await File.WriteAllBytesAsync(output, img.Data, ct); await _saveChannel.Writer.WriteAsync(new SavedResult(output, true), ct); Console.WriteLine($"Saved: {output}"); } } finally { _saveChannel.Writer.Complete(); } }}5. 복수 생산자 / 복수 소비자
섹션 제목: “5. 복수 생산자 / 복수 소비자”// 복수 생산자var channel = Channel.CreateBounded<int>(100);
var producers = Enumerable.Range(0, 4) .Select(i => Task.Run(async () => { for (int j = 0; j < 25; j++) { await channel.Writer.WriteAsync(i * 25 + j); } })) .ToArray();
// 모든 생산자 완료 후 채널 닫기_ = Task.WhenAll(producers).ContinueWith(_ => channel.Writer.Complete());
// 복수 소비자 (병렬 처리)var consumers = Enumerable.Range(0, 2) .Select(_ => Task.Run(async () => { await foreach (int item in channel.Reader.ReadAllAsync()) { await ProcessAsync(item); } })) .ToArray();
await Task.WhenAll(consumers);6. TryRead / TryWrite (논블로킹)
섹션 제목: “6. TryRead / TryWrite (논블로킹)”var channel = Channel.CreateBounded<string>(10);
// 논블로킹 쓰기bool written = channel.Writer.TryWrite("item");if (!written){ Console.WriteLine("Channel full, dropping item");}
// 논블로킹 읽기if (channel.Reader.TryRead(out string? item)){ Console.WriteLine($"Got: {item}");}
// 항목 개수 확인 (BoundedChannel)int count = channel.Reader.Count;7. 취소와 오류 처리
섹션 제목: “7. 취소와 오류 처리”async Task RobustPipelineAsync(CancellationToken ct){ var channel = Channel.CreateUnbounded<WorkItem>();
var producer = Task.Run(async () => { try { while (!ct.IsCancellationRequested) { var item = await FetchNextItemAsync(ct); await channel.Writer.WriteAsync(item, ct); } } catch (OperationCanceledException) { // 정상 취소 } catch (Exception ex) { // 오류 시 채널을 오류 상태로 완료 channel.Writer.Complete(ex); return; } channel.Writer.Complete(); });
var consumer = Task.Run(async () => { try { await foreach (var item in channel.Reader.ReadAllAsync(ct)) { await ProcessAsync(item); } } catch (ChannelClosedException ex) when (ex.InnerException is not null) { Console.WriteLine($"Channel closed with error: {ex.InnerException.Message}"); } });
await Task.WhenAll(producer, consumer);}8. BlockingCollection vs Channel 비교
섹션 제목: “8. BlockingCollection vs Channel 비교”| 항목 | BlockingCollection<T> | Channel<T> |
|---|---|---|
| API 스타일 | 동기 (블로킹) | 비동기 (await) |
| async/await 지원 | 없음 | 완전 지원 |
| 백프레셔 | 있음 | 있음 (Bounded) |
| 성능 | 보통 | 높음 |
| .NET 버전 | .NET 4.0+ | .NET Core 2.1+ |
System.Threading.Channels는 비동기 파이프라인 구축에 최적화된 고성능 라이브러리입니다. BoundedChannel의 백프레셔 메커니즘으로 메모리 사용을 제어하고, ReadAllAsync와 await foreach로 소비자를 우아하게 구현할 수 있습니다. CPU 집약적 변환 단계는 Task.Run으로 스레드 풀에 오프로드하고, I/O 단계는 네이티브 비동기 API를 사용하면 최고의 처리량을 달성할 수 있습니다.