콘텐츠로 이동

IAsyncEnumerable\<T\>와 비동기 스트림 완전 가이드

IAsyncEnumerable<T>는 C# 8.0에서 도입된 비동기 스트림 인터페이스입니다. 일반 IEnumerable<T>와 달리 각 항목을 비동기적으로 반환하며, await foreach로 소비합니다. 데이터베이스 스트리밍, HTTP 청크 응답, 실시간 이벤트 처리에 적합합니다.


// 비동기 이터레이터
async IAsyncEnumerable<int> GenerateNumbers()
{
for (int i = 0; i < 10; i++)
{
await Task.Delay(100); // 비동기 작업
yield return i; // 값 반환
}
}
// await foreach로 소비
await foreach (int number in GenerateNumbers())
{
Console.WriteLine(number);
}

// 동기 — 모든 데이터를 메모리에 로드
IEnumerable<Order> GetOrders()
{
return dbContext.Orders.ToList(); // 전체 로드
}
// 비동기 스트림 — 한 번에 하나씩
async IAsyncEnumerable<Order> GetOrdersAsync()
{
await foreach (var order in dbContext.Orders.AsAsyncEnumerable())
{
yield return order; // 스트리밍
}
}
// 소비
await foreach (var order in GetOrdersAsync())
{
await ProcessOrderAsync(order); // 배치 없이 즉시 처리
}

async IAsyncEnumerable<LogEntry> StreamLogs(
[EnumeratorCancellation] CancellationToken ct = default)
{
while (!ct.IsCancellationRequested)
{
var entry = await FetchNextLogAsync(ct);
yield return entry;
}
}
// 소비 시 취소 토큰 전달
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
await foreach (var log in StreamLogs().WithCancellation(cts.Token))
{
Console.WriteLine(log.Message);
}
await foreach (var item in GetItemsAsync()
.WithCancellation(ct)
.ConfigureAwait(false)) // 컨텍스트 캡처 비활성화 (라이브러리 코드)
{
ProcessItem(item);
}

// Entity Framework Core
async IAsyncEnumerable<Product> GetExpensiveProducts(
decimal threshold,
[EnumeratorCancellation] CancellationToken ct = default)
{
await foreach (var product in dbContext.Products
.Where(p => p.Price > threshold)
.AsAsyncEnumerable()
.WithCancellation(ct))
{
yield return product;
}
}
// 대용량 데이터 처리 — 메모리 효율적
await foreach (var product in GetExpensiveProducts(1000m, ct))
{
await UpdateProductCacheAsync(product);
}

// ASP.NET Core Minimal API
app.MapGet("/stream", (CancellationToken ct) =>
StreamResults(ct));
static async IAsyncEnumerable<DataPoint> StreamResults(
[EnumeratorCancellation] CancellationToken ct)
{
for (int i = 0; i < 100; i++)
{
ct.ThrowIfCancellationRequested();
await Task.Delay(100, ct);
yield return new DataPoint(i, DateTime.UtcNow);
}
}
// 클라이언트에서 소비
using var client = new HttpClient();
var stream = client.GetFromJsonAsAsyncEnumerable<DataPoint>("/stream");
await foreach (var point in stream)
{
Console.WriteLine($"{point.Value} @ {point.Timestamp}");
}

System.Linq.Async
using System.Linq;
var result = await GetItemsAsync()
.Where(x => x.Value > 10)
.Select(x => x.Value * 2)
.Take(5)
.ToListAsync();
// 집계
int sum = await GetNumbersAsync().SumAsync();
int count = await GetNumbersAsync().CountAsync(x => x > 5);
bool any = await GetNumbersAsync().AnyAsync(x => x > 100);

// Channel을 IAsyncEnumerable로 변환
Channel<int> channel = Channel.CreateUnbounded<int>();
// 채널 리더를 IAsyncEnumerable로 사용
await foreach (int item in channel.Reader.ReadAllAsync(ct))
{
Console.WriteLine(item);
}

// IAsyncEnumerable을 배치로 처리하는 확장 메서드
static async IAsyncEnumerable<IReadOnlyList<T>> Batch<T>(
this IAsyncEnumerable<T> source,
int size,
[EnumeratorCancellation] CancellationToken ct = default)
{
var batch = new List<T>(size);
await foreach (var item in source.WithCancellation(ct))
{
batch.Add(item);
if (batch.Count == size)
{
yield return batch;
batch = new List<T>(size);
}
}
if (batch.Count > 0)
yield return batch;
}
// 사용
await foreach (var batch in GetItemsAsync().Batch(100))
{
await BulkInsertAsync(batch);
}

9. 실전 패턴 — 실시간 이벤트 스트림

섹션 제목: “9. 실전 패턴 — 실시간 이벤트 스트림”
async IAsyncEnumerable<GameEvent> ListenForEvents(
string roomId,
[EnumeratorCancellation] CancellationToken ct = default)
{
var queue = eventBus.Subscribe(roomId);
try
{
while (!ct.IsCancellationRequested)
{
if (queue.TryDequeue(out var evt))
{
yield return evt;
}
else
{
await Task.Delay(10, ct);
}
}
}
finally
{
eventBus.Unsubscribe(roomId, queue);
}
}
// SignalR과 결합
await foreach (var evt in ListenForEvents("room-1", ct))
{
await hubContext.Clients.Group("room-1")
.SendAsync("Event", evt, ct);
}

시나리오권장 방식
대용량 DB 조회AsAsyncEnumerable()
HTTP 청크 응답IAsyncEnumerable 반환
실시간 이벤트yield return 루프
취소 지원[EnumeratorCancellation]
배치 처리Batch() 확장 메서드