콘텐츠로 이동

System.IO.Pipelines 고성능 I/O

System.IO.Pipelines는 .NET Core 2.1에서 도입된 고성능 I/O 파이프라인 프리미티브입니다. Kestrel 웹 서버가 내부적으로 사용하며, 불필요한 메모리 복사와 할당을 제거해 고처리량 네트워크 코드를 작성할 수 있습니다.


Stream 방식:
소켓 → 버퍼 복사 → 파싱 버퍼 복사 → 처리
Pipelines 방식:
소켓 → Pipe(내부 메모리) → 파싱(제로 복사) → 처리

핵심 타입:

  • Pipe — PipeReader + PipeWriter를 연결하는 채널
  • PipeReader — 데이터를 소비하는 쪽
  • PipeWriter — 데이터를 생산하는 쪽

using System.IO.Pipelines;
using System.Text;
var pipe = new Pipe();
// 생산자: PipeWriter에 데이터 쓰기
async Task ProduceAsync(PipeWriter writer)
{
for (int i = 0; i < 5; i++)
{
var message = $"메시지 {i}\n";
var bytes = Encoding.UTF8.GetBytes(message);
await writer.WriteAsync(bytes);
}
await writer.CompleteAsync();
}
// 소비자: PipeReader에서 데이터 읽기
async Task ConsumeAsync(PipeReader reader)
{
while (true)
{
var result = await reader.ReadAsync();
var buffer = result.Buffer;
foreach (var segment in buffer)
Console.Write(Encoding.UTF8.GetString(segment.Span));
reader.AdvanceTo(buffer.End);
if (result.IsCompleted) break;
}
await reader.CompleteAsync();
}
await Task.WhenAll(ProduceAsync(pipe.Writer), ConsumeAsync(pipe.Reader));

using System.IO.Pipelines;
using System.Buffers;
using System.Text;
async Task ParseLinesAsync(PipeReader reader)
{
while (true)
{
var result = await reader.ReadAsync();
var buffer = result.Buffer;
SequencePosition consumed = buffer.Start;
SequencePosition examined = buffer.End;
while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
{
consumed = line.End;
ProcessLine(line); // 복사 없이 직접 처리
}
reader.AdvanceTo(consumed, examined);
if (result.IsCompleted) break;
}
}
bool TryReadLine(ref ReadOnlySequence<byte> buffer,
out ReadOnlySequence<byte> line)
{
var reader = new SequenceReader<byte>(buffer);
if (reader.TryReadTo(out line, (byte)'\n'))
{
buffer = buffer.Slice(reader.Position);
return true;
}
line = default;
return false;
}
void ProcessLine(ReadOnlySequence<byte> line)
{
// 복사 없이 Span으로 직접 접근
if (line.IsSingleSegment)
{
var span = line.FirstSpan;
// span 직접 처리
}
else
{
// 멀티 세그먼트: 스택 버퍼로 합치기
Span<byte> buf = stackalloc byte[(int)line.Length];
line.CopyTo(buf);
}
}

using System.Net.Sockets;
using System.IO.Pipelines;
async Task HandleConnectionAsync(Socket socket)
{
var pipe = new Pipe();
// 소켓 → Pipe 채우기
async Task FillPipeAsync()
{
while (true)
{
var memory = pipe.Writer.GetMemory(4096);
int bytes = await socket.ReceiveAsync(memory, SocketFlags.None);
if (bytes == 0) break;
pipe.Writer.Advance(bytes);
var result = await pipe.Writer.FlushAsync();
if (result.IsCompleted) break;
}
await pipe.Writer.CompleteAsync();
}
await Task.WhenAll(FillPipeAsync(), ParseLinesAsync(pipe.Reader));
}

var options = new PipeOptions(
pool: MemoryPool<byte>.Shared,
readerScheduler: PipeScheduler.ThreadPool,
writerScheduler: PipeScheduler.Inline,
pauseWriterThreshold: 64 * 1024, // 쓰기 중단 임계값 (64KB)
resumeWriterThreshold: 32 * 1024, // 쓰기 재개 임계값 (32KB)
minimumSegmentSize: 4096);
var pipe = new Pipe(options);

6. StreamPipeReader/Writer — 기존 Stream 연동

섹션 제목: “6. StreamPipeReader/Writer — 기존 Stream 연동”
using System.IO.Pipelines;
using System.IO;
// 기존 Stream을 Pipelines로 감싸기
async Task ReadFileWithPipelines(string path)
{
await using var stream = File.OpenRead(path);
var reader = PipeReader.Create(stream, new StreamPipeReaderOptions(
bufferSize: 64 * 1024,
leaveOpen: false));
await ParseLinesAsync(reader);
}

System.IO.Pipelines는 고처리량 네트워크 서버나 대용량 파일 파싱처럼 Stream 기반 복사가 병목이 되는 상황에서 핵심 도구입니다. PipeReader.AdvanceTo로 소비된 데이터만 정확히 방출하고, ReadOnlySequence<byte>로 복사 없이 데이터를 처리하는 패턴이 핵심입니다.