System.IO.Pipelines 고성능 I/O
System.IO.Pipelines는 .NET Core 2.1에서 도입된 고성능 I/O 파이프라인 프리미티브입니다. Kestrel 웹 서버가 내부적으로 사용하며, 불필요한 메모리 복사와 할당을 제거해 고처리량 네트워크 코드를 작성할 수 있습니다.
1. Stream vs Pipelines
섹션 제목: “1. Stream vs Pipelines”Stream 방식: 소켓 → 버퍼 복사 → 파싱 버퍼 복사 → 처리
Pipelines 방식: 소켓 → Pipe(내부 메모리) → 파싱(제로 복사) → 처리핵심 타입:
Pipe— PipeReader + PipeWriter를 연결하는 채널PipeReader— 데이터를 소비하는 쪽PipeWriter— 데이터를 생산하는 쪽
2. 기본 Pipe 사용
섹션 제목: “2. 기본 Pipe 사용”using System.IO.Pipelines;using System.Text;
var pipe = new Pipe();
// 생산자: PipeWriter에 데이터 쓰기async Task ProduceAsync(PipeWriter writer){ for (int i = 0; i < 5; i++) { var message = $"메시지 {i}\n"; var bytes = Encoding.UTF8.GetBytes(message);
await writer.WriteAsync(bytes); } await writer.CompleteAsync();}
// 소비자: PipeReader에서 데이터 읽기async Task ConsumeAsync(PipeReader reader){ while (true) { var result = await reader.ReadAsync(); var buffer = result.Buffer;
foreach (var segment in buffer) Console.Write(Encoding.UTF8.GetString(segment.Span));
reader.AdvanceTo(buffer.End);
if (result.IsCompleted) break; } await reader.CompleteAsync();}
await Task.WhenAll(ProduceAsync(pipe.Writer), ConsumeAsync(pipe.Reader));3. 라인 파싱 패턴 (Zero-copy)
섹션 제목: “3. 라인 파싱 패턴 (Zero-copy)”using System.IO.Pipelines;using System.Buffers;using System.Text;
async Task ParseLinesAsync(PipeReader reader){ while (true) { var result = await reader.ReadAsync(); var buffer = result.Buffer; SequencePosition consumed = buffer.Start; SequencePosition examined = buffer.End;
while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line)) { consumed = line.End; ProcessLine(line); // 복사 없이 직접 처리 }
reader.AdvanceTo(consumed, examined);
if (result.IsCompleted) break; }}
bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line){ var reader = new SequenceReader<byte>(buffer);
if (reader.TryReadTo(out line, (byte)'\n')) { buffer = buffer.Slice(reader.Position); return true; }
line = default; return false;}
void ProcessLine(ReadOnlySequence<byte> line){ // 복사 없이 Span으로 직접 접근 if (line.IsSingleSegment) { var span = line.FirstSpan; // span 직접 처리 } else { // 멀티 세그먼트: 스택 버퍼로 합치기 Span<byte> buf = stackalloc byte[(int)line.Length]; line.CopyTo(buf); }}4. 소켓 연동
섹션 제목: “4. 소켓 연동”using System.Net.Sockets;using System.IO.Pipelines;
async Task HandleConnectionAsync(Socket socket){ var pipe = new Pipe();
// 소켓 → Pipe 채우기 async Task FillPipeAsync() { while (true) { var memory = pipe.Writer.GetMemory(4096); int bytes = await socket.ReceiveAsync(memory, SocketFlags.None); if (bytes == 0) break;
pipe.Writer.Advance(bytes); var result = await pipe.Writer.FlushAsync(); if (result.IsCompleted) break; } await pipe.Writer.CompleteAsync(); }
await Task.WhenAll(FillPipeAsync(), ParseLinesAsync(pipe.Reader));}5. PipeOptions 설정
섹션 제목: “5. PipeOptions 설정”var options = new PipeOptions( pool: MemoryPool<byte>.Shared, readerScheduler: PipeScheduler.ThreadPool, writerScheduler: PipeScheduler.Inline, pauseWriterThreshold: 64 * 1024, // 쓰기 중단 임계값 (64KB) resumeWriterThreshold: 32 * 1024, // 쓰기 재개 임계값 (32KB) minimumSegmentSize: 4096);
var pipe = new Pipe(options);6. StreamPipeReader/Writer — 기존 Stream 연동
섹션 제목: “6. StreamPipeReader/Writer — 기존 Stream 연동”using System.IO.Pipelines;using System.IO;
// 기존 Stream을 Pipelines로 감싸기async Task ReadFileWithPipelines(string path){ await using var stream = File.OpenRead(path); var reader = PipeReader.Create(stream, new StreamPipeReaderOptions( bufferSize: 64 * 1024, leaveOpen: false));
await ParseLinesAsync(reader);}System.IO.Pipelines는 고처리량 네트워크 서버나 대용량 파일 파싱처럼 Stream 기반 복사가 병목이 되는 상황에서 핵심 도구입니다. PipeReader.AdvanceTo로 소비된 데이터만 정확히 방출하고, ReadOnlySequence<byte>로 복사 없이 데이터를 처리하는 패턴이 핵심입니다.