콘텐츠로 이동

System.Threading.Channels 고성능 데이터 파이프라인

System.Threading.Channels는 .NET Core 3.0부터 제공되는 스레드 안전한 비동기 데이터 채널 라이브러리입니다. BlockingCollection<T>의 비동기 버전으로, async/await와 완벽하게 통합되어 고성능 생산자-소비자 파이프라인을 구축할 수 있습니다.


생산자 (Producer)
↓ WriteAsync
Channel<T>
↓ ReadAsync
소비자 (Consumer)

채널에는 두 가지 끝점이 있습니다.

  • ChannelWriter<T>: 데이터를 채널에 씁니다.
  • ChannelReader<T>: 채널에서 데이터를 읽습니다.

using System.Threading.Channels;
// Unbounded 채널 (크기 제한 없음)
var channel = Channel.CreateUnbounded<int>();
ChannelWriter<int> writer = channel.Writer;
ChannelReader<int> reader = channel.Reader;
// 생산자
async Task ProduceAsync()
{
for (int i = 0; i < 10; i++)
{
await writer.WriteAsync(i);
Console.WriteLine($"Produced: {i}");
}
writer.Complete(); // 완료 신호
}
// 소비자
async Task ConsumeAsync()
{
await foreach (int item in reader.ReadAllAsync())
{
Console.WriteLine($"Consumed: {item}");
}
}
await Task.WhenAll(ProduceAsync(), ConsumeAsync());

// 최대 100개 항목만 버퍼링
var options = new BoundedChannelOptions(capacity: 100)
{
FullMode = BoundedChannelFullMode.Wait, // 꽉 차면 생산자 대기
// FullMode = BoundedChannelFullMode.DropOldest, // 오래된 항목 삭제
// FullMode = BoundedChannelFullMode.DropNewest, // 새 항목 삭제
// FullMode = BoundedChannelFullMode.DropWrite, // 쓰기 실패 (TryWrite 반환 false)
SingleReader = false, // 복수 소비자
SingleWriter = false, // 복수 생산자
};
var channel = Channel.CreateBounded<WorkItem>(options);

4. 실전: 이미지 처리 파이프라인

섹션 제목: “4. 실전: 이미지 처리 파이프라인”
using System.Threading.Channels;
record ImageTask(string Path, int Width, int Height);
record ResizedImage(string Path, byte[] Data);
record SavedResult(string OutputPath, bool Success);
class ImagePipeline
{
private readonly Channel<ImageTask> _loadChannel;
private readonly Channel<ResizedImage> _resizeChannel;
private readonly Channel<SavedResult> _saveChannel;
public ImagePipeline()
{
var opts = new BoundedChannelOptions(50) { FullMode = BoundedChannelFullMode.Wait };
_loadChannel = Channel.CreateBounded<ImageTask>(opts);
_resizeChannel = Channel.CreateBounded<ResizedImage>(opts);
_saveChannel = Channel.CreateBounded<SavedResult>(opts);
}
public async Task RunAsync(IEnumerable<string> imagePaths, CancellationToken ct)
{
// 3단계 파이프라인을 병렬로 실행
var stageLoad = LoadStageAsync(imagePaths, ct);
var stageResize = ResizeStageAsync(ct);
var stageSave = SaveStageAsync(ct);
await Task.WhenAll(stageLoad, stageResize, stageSave);
}
private async Task LoadStageAsync(IEnumerable<string> paths, CancellationToken ct)
{
try
{
foreach (var path in paths)
{
ct.ThrowIfCancellationRequested();
// 실제 로드 로직 (예시)
var task = new ImageTask(path, 1920, 1080);
await _loadChannel.Writer.WriteAsync(task, ct);
}
}
finally
{
_loadChannel.Writer.Complete();
}
}
private async Task ResizeStageAsync(CancellationToken ct)
{
try
{
await foreach (var task in _loadChannel.Reader.ReadAllAsync(ct))
{
// 실제 리사이즈 로직 (예시)
var resized = new ResizedImage(task.Path, new byte[task.Width * task.Height * 3]);
await _resizeChannel.Writer.WriteAsync(resized, ct);
}
}
finally
{
_resizeChannel.Writer.Complete();
}
}
private async Task SaveStageAsync(CancellationToken ct)
{
try
{
await foreach (var img in _resizeChannel.Reader.ReadAllAsync(ct))
{
string output = img.Path.Replace(".jpg", "_thumb.jpg");
// 실제 저장 로직 (예시)
await File.WriteAllBytesAsync(output, img.Data, ct);
await _saveChannel.Writer.WriteAsync(new SavedResult(output, true), ct);
Console.WriteLine($"Saved: {output}");
}
}
finally
{
_saveChannel.Writer.Complete();
}
}
}

// 복수 생산자
var channel = Channel.CreateBounded<int>(100);
var producers = Enumerable.Range(0, 4)
.Select(i => Task.Run(async () =>
{
for (int j = 0; j < 25; j++)
{
await channel.Writer.WriteAsync(i * 25 + j);
}
}))
.ToArray();
// 모든 생산자 완료 후 채널 닫기
_ = Task.WhenAll(producers).ContinueWith(_ => channel.Writer.Complete());
// 복수 소비자 (병렬 처리)
var consumers = Enumerable.Range(0, 2)
.Select(_ => Task.Run(async () =>
{
await foreach (int item in channel.Reader.ReadAllAsync())
{
await ProcessAsync(item);
}
}))
.ToArray();
await Task.WhenAll(consumers);

var channel = Channel.CreateBounded<string>(10);
// 논블로킹 쓰기
bool written = channel.Writer.TryWrite("item");
if (!written)
{
Console.WriteLine("Channel full, dropping item");
}
// 논블로킹 읽기
if (channel.Reader.TryRead(out string? item))
{
Console.WriteLine($"Got: {item}");
}
// 항목 개수 확인 (BoundedChannel)
int count = channel.Reader.Count;

async Task RobustPipelineAsync(CancellationToken ct)
{
var channel = Channel.CreateUnbounded<WorkItem>();
var producer = Task.Run(async () =>
{
try
{
while (!ct.IsCancellationRequested)
{
var item = await FetchNextItemAsync(ct);
await channel.Writer.WriteAsync(item, ct);
}
}
catch (OperationCanceledException)
{
// 정상 취소
}
catch (Exception ex)
{
// 오류 시 채널을 오류 상태로 완료
channel.Writer.Complete(ex);
return;
}
channel.Writer.Complete();
});
var consumer = Task.Run(async () =>
{
try
{
await foreach (var item in channel.Reader.ReadAllAsync(ct))
{
await ProcessAsync(item);
}
}
catch (ChannelClosedException ex) when (ex.InnerException is not null)
{
Console.WriteLine($"Channel closed with error: {ex.InnerException.Message}");
}
});
await Task.WhenAll(producer, consumer);
}

항목BlockingCollection<T>Channel<T>
API 스타일동기 (블로킹)비동기 (await)
async/await 지원없음완전 지원
백프레셔있음있음 (Bounded)
성능보통높음
.NET 버전.NET 4.0+.NET Core 2.1+

System.Threading.Channels는 비동기 파이프라인 구축에 최적화된 고성능 라이브러리입니다. BoundedChannel의 백프레셔 메커니즘으로 메모리 사용을 제어하고, ReadAllAsyncawait foreach로 소비자를 우아하게 구현할 수 있습니다. CPU 집약적 변환 단계는 Task.Run으로 스레드 풀에 오프로드하고, I/O 단계는 네이티브 비동기 API를 사용하면 최고의 처리량을 달성할 수 있습니다.