콘텐츠로 이동

System.Threading.Channels로 비동기 생산자-소비자 구현

System.Threading.Channels는 .NET Core 3.0에서 도입된 고성능 비동기 생산자-소비자 채널입니다. BlockingCollection<T>의 비동기 버전으로, async/await와 완벽하게 통합되며 백프레셔(backpressure) 처리를 내장합니다.


using System.Threading.Channels;
// 무한 크기 채널
Channel<int> unbounded = Channel.CreateUnbounded<int>();
// 크기 제한 채널 (백프레셔 지원)
Channel<int> bounded = Channel.CreateBounded<int>(capacity: 100);
// 채널은 두 종단으로 분리
ChannelWriter<int> writer = bounded.Writer;
ChannelReader<int> reader = bounded.Reader;

var channel = Channel.CreateUnbounded<string>();
// 생산자
async Task ProduceAsync()
{
for (int i = 0; i < 10; i++)
{
await channel.Writer.WriteAsync($"item-{i}");
await Task.Delay(100);
}
channel.Writer.Complete(); // 쓰기 완료 신호
}
// 소비자
async Task ConsumeAsync()
{
await foreach (var item in channel.Reader.ReadAllAsync())
{
Console.WriteLine($"처리: {item}");
}
// Writer.Complete() 호출 후 루프 종료
}
await Task.WhenAll(ProduceAsync(), ConsumeAsync());

var options = new BoundedChannelOptions(capacity: 10)
{
FullMode = BoundedChannelFullMode.Wait, // 꽉 차면 생산자 대기
// FullMode = BoundedChannelFullMode.DropOldest, // 오래된 항목 제거
// FullMode = BoundedChannelFullMode.DropNewest, // 새 항목 버림
// FullMode = BoundedChannelFullMode.DropWrite, // 현재 쓰기 실패
SingleReader = false,
SingleWriter = false
};
var channel = Channel.CreateBounded<WorkItem>(options);
// TryWrite: 즉시 반환, 채널이 꽉 차면 false
if (channel.Writer.TryWrite(item))
{
Console.WriteLine("쓰기 성공");
}
else
{
Console.WriteLine("채널 포화 — 나중에 재시도");
}
// WriteAsync: 공간이 생길 때까지 비동기 대기
await channel.Writer.WriteAsync(item, cancellationToken);

var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(50)
{
SingleReader = false,
SingleWriter = false
});
// 3개 생산자
var producers = Enumerable.Range(0, 3).Select(async id =>
{
for (int i = 0; i < 100; i++)
{
await channel.Writer.WriteAsync(id * 100 + i);
}
});
// 2개 소비자
var consumers = Enumerable.Range(0, 2).Select(async id =>
{
await foreach (var item in channel.Reader.ReadAllAsync())
{
Console.WriteLine($"소비자 {id}: {item}");
}
});
// 생산자 완료 후 채널 닫기
await Task.WhenAll(producers);
channel.Writer.Complete();
await Task.WhenAll(consumers);

채널을 연결하여 처리 단계를 파이프라인으로 구성합니다.

async Task RunPipeline(CancellationToken ct)
{
// 단계 1: 데이터 수집
var rawChannel = Channel.CreateBounded<string>(100);
// 단계 2: 파싱
var parsedChannel = Channel.CreateBounded<ParsedData>(50);
// 단계 3: 저장
var saveChannel = Channel.CreateBounded<ProcessedData>(50);
// 각 단계를 병렬로 실행
var stage1 = FetchData(rawChannel.Writer, ct);
var stage2 = ParseData(rawChannel.Reader, parsedChannel.Writer, ct);
var stage3 = ProcessData(parsedChannel.Reader, saveChannel.Writer, ct);
var stage4 = SaveData(saveChannel.Reader, ct);
await Task.WhenAll(stage1, stage2, stage3, stage4);
}
async Task FetchData(ChannelWriter<string> writer, CancellationToken ct)
{
try
{
for (int i = 0; i < 1000; i++)
{
await writer.WriteAsync($"raw-{i}", ct);
}
}
finally
{
writer.Complete();
}
}
async Task ParseData(
ChannelReader<string> reader,
ChannelWriter<ParsedData> writer,
CancellationToken ct)
{
try
{
await foreach (var raw in reader.ReadAllAsync(ct))
{
var parsed = Parse(raw); // 동기 파싱
await writer.WriteAsync(parsed, ct);
}
}
finally
{
writer.Complete();
}
}

var channel = Channel.CreateUnbounded<int>();
async Task ProduceWithError()
{
try
{
for (int i = 0; i < 100; i++)
{
if (i == 50) throw new InvalidOperationException("생산 오류");
await channel.Writer.WriteAsync(i);
}
channel.Writer.Complete();
}
catch (Exception ex)
{
// 오류로 채널 완료 → 소비자가 예외를 받음
channel.Writer.Complete(ex);
}
}
async Task ConsumeWithError()
{
try
{
await foreach (var item in channel.Reader.ReadAllAsync())
{
// 처리
}
}
catch (ChannelClosedException ex)
{
Console.WriteLine($"채널 오류로 종료: {ex.InnerException?.Message}");
}
}

항목BlockingCollection<T>Channel<T>
비동기XO
async/awaitXO
백프레셔OO
성능보통높음
.NET 버전.NET Framework 4.0+.NET Core 3.0+

  • Channel.CreateUnbounded<T>(): 제한 없는 채널
  • Channel.CreateBounded<T>(N): 백프레셔 내장 채널
  • ReadAllAsync(): IAsyncEnumerable로 소비
  • Writer.Complete(ex): 오류와 함께 채널 종료
  • 다중 단계 파이프라인 구성에 최적화