IAsyncEnumerable\<T\>와 비동기 스트림 완전 가이드
IAsyncEnumerable<T>는 C# 8.0에서 도입된 비동기 스트림 인터페이스입니다. 일반 IEnumerable<T>와 달리 각 항목을 비동기적으로 반환하며, await foreach로 소비합니다. 데이터베이스 스트리밍, HTTP 청크 응답, 실시간 이벤트 처리에 적합합니다.
1. 기본 구조
섹션 제목: “1. 기본 구조”// 비동기 이터레이터async IAsyncEnumerable<int> GenerateNumbers(){ for (int i = 0; i < 10; i++) { await Task.Delay(100); // 비동기 작업 yield return i; // 값 반환 }}
// await foreach로 소비await foreach (int number in GenerateNumbers()){ Console.WriteLine(number);}2. IEnumerable vs IAsyncEnumerable
섹션 제목: “2. IEnumerable vs IAsyncEnumerable”// 동기 — 모든 데이터를 메모리에 로드IEnumerable<Order> GetOrders(){ return dbContext.Orders.ToList(); // 전체 로드}
// 비동기 스트림 — 한 번에 하나씩async IAsyncEnumerable<Order> GetOrdersAsync(){ await foreach (var order in dbContext.Orders.AsAsyncEnumerable()) { yield return order; // 스트리밍 }}
// 소비await foreach (var order in GetOrdersAsync()){ await ProcessOrderAsync(order); // 배치 없이 즉시 처리}3. 취소 토큰 지원
섹션 제목: “3. 취소 토큰 지원”async IAsyncEnumerable<LogEntry> StreamLogs( [EnumeratorCancellation] CancellationToken ct = default){ while (!ct.IsCancellationRequested) { var entry = await FetchNextLogAsync(ct); yield return entry; }}
// 소비 시 취소 토큰 전달using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
await foreach (var log in StreamLogs().WithCancellation(cts.Token)){ Console.WriteLine(log.Message);}3.1 ConfigureAwait 적용
섹션 제목: “3.1 ConfigureAwait 적용”await foreach (var item in GetItemsAsync() .WithCancellation(ct) .ConfigureAwait(false)) // 컨텍스트 캡처 비활성화 (라이브러리 코드){ ProcessItem(item);}4. 데이터베이스 스트리밍
섹션 제목: “4. 데이터베이스 스트리밍”// Entity Framework Coreasync IAsyncEnumerable<Product> GetExpensiveProducts( decimal threshold, [EnumeratorCancellation] CancellationToken ct = default){ await foreach (var product in dbContext.Products .Where(p => p.Price > threshold) .AsAsyncEnumerable() .WithCancellation(ct)) { yield return product; }}
// 대용량 데이터 처리 — 메모리 효율적await foreach (var product in GetExpensiveProducts(1000m, ct)){ await UpdateProductCacheAsync(product);}5. HTTP 스트리밍
섹션 제목: “5. HTTP 스트리밍”// ASP.NET Core Minimal APIapp.MapGet("/stream", (CancellationToken ct) => StreamResults(ct));
static async IAsyncEnumerable<DataPoint> StreamResults( [EnumeratorCancellation] CancellationToken ct){ for (int i = 0; i < 100; i++) { ct.ThrowIfCancellationRequested(); await Task.Delay(100, ct); yield return new DataPoint(i, DateTime.UtcNow); }}
// 클라이언트에서 소비using var client = new HttpClient();var stream = client.GetFromJsonAsAsyncEnumerable<DataPoint>("/stream");await foreach (var point in stream){ Console.WriteLine($"{point.Value} @ {point.Timestamp}");}6. LINQ 확장 (System.Linq.Async)
섹션 제목: “6. LINQ 확장 (System.Linq.Async)”using System.Linq;
var result = await GetItemsAsync() .Where(x => x.Value > 10) .Select(x => x.Value * 2) .Take(5) .ToListAsync();
// 집계int sum = await GetNumbersAsync().SumAsync();int count = await GetNumbersAsync().CountAsync(x => x > 5);bool any = await GetNumbersAsync().AnyAsync(x => x > 100);7. 채널과 통합
섹션 제목: “7. 채널과 통합”// Channel을 IAsyncEnumerable로 변환Channel<int> channel = Channel.CreateUnbounded<int>();
// 채널 리더를 IAsyncEnumerable로 사용await foreach (int item in channel.Reader.ReadAllAsync(ct)){ Console.WriteLine(item);}8. 배치 처리 확장
섹션 제목: “8. 배치 처리 확장”// IAsyncEnumerable을 배치로 처리하는 확장 메서드static async IAsyncEnumerable<IReadOnlyList<T>> Batch<T>( this IAsyncEnumerable<T> source, int size, [EnumeratorCancellation] CancellationToken ct = default){ var batch = new List<T>(size);
await foreach (var item in source.WithCancellation(ct)) { batch.Add(item); if (batch.Count == size) { yield return batch; batch = new List<T>(size); } }
if (batch.Count > 0) yield return batch;}
// 사용await foreach (var batch in GetItemsAsync().Batch(100)){ await BulkInsertAsync(batch);}9. 실전 패턴 — 실시간 이벤트 스트림
섹션 제목: “9. 실전 패턴 — 실시간 이벤트 스트림”async IAsyncEnumerable<GameEvent> ListenForEvents( string roomId, [EnumeratorCancellation] CancellationToken ct = default){ var queue = eventBus.Subscribe(roomId);
try { while (!ct.IsCancellationRequested) { if (queue.TryDequeue(out var evt)) { yield return evt; } else { await Task.Delay(10, ct); } } } finally { eventBus.Unsubscribe(roomId, queue); }}
// SignalR과 결합await foreach (var evt in ListenForEvents("room-1", ct)){ await hubContext.Clients.Group("room-1") .SendAsync("Event", evt, ct);}| 시나리오 | 권장 방식 |
|---|---|
| 대용량 DB 조회 | AsAsyncEnumerable() |
| HTTP 청크 응답 | IAsyncEnumerable 반환 |
| 실시간 이벤트 | yield return 루프 |
| 취소 지원 | [EnumeratorCancellation] |
| 배치 처리 | Batch() 확장 메서드 |