基于 .NET 7 的 QUIC 实现 Echo 服务( 二 )

阻塞线程,直到接收到一个 Quic 连接,一个 QuicListener 可以接收多个 连接 。
var connection = await listener.AcceptConnectionAsync();Console.WriteLine($"Client [{connection.RemoteEndPoint}]: connected");接收一个入站的 Quic 流,一个 QuicConnection可以支持多个流 。
var stream = await connection.AcceptInboundStreamAsync();Console.WriteLine($"Stream [{stream.Id}]: created");接下来,使用 System.IO.Pipeline 处理流数据,读取行数据,并回复一个 ack 消息 。
Console.WriteLine();await ProcessLinesAsync(stream);Console.ReadKey();// 处理流数据async Task ProcessLinesAsync(QuicStream stream){var reader = PipeReader.Create(stream);var writer = PipeWriter.Create(stream);while (true){ReadResult result = await reader.ReadAsync();ReadOnlySequence<byte> buffer = result.Buffer;while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line)){// 读取行数据ProcessLine(line);// 写入 ACK 消息await writer.WriteAsync(Encoding.UTF8.GetBytes($"Ack: {DateTime.Now.ToString("HH:mm:ss")} \n"));}reader.AdvanceTo(buffer.Start, buffer.End);if (result.IsCompleted){break;}}Console.WriteLine($"Stream [{stream.Id}]: completed");await reader.CompleteAsync();await writer.CompleteAsync();} bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line){SequencePosition? position = buffer.PositionOf((byte)'\n');if (position == null){line = default;return false;}line = buffer.Slice(0, position.Value);buffer = buffer.Slice(buffer.GetPosition(1, position.Value));return true;} void ProcessLine(in ReadOnlySequence<byte> buffer){foreach (var segment in buffer){Console.WriteLine("Recevied -> " + System.Text.Encoding.UTF8.GetString(segment.Span));}Console.WriteLine();}以上就是服务端的完整代码了 。
接下来我们看一下客户端 QuicClient 的代码 。
直接使用 QuicConnection.ConnectAsync 连接到服务端 。
Console.WriteLine("Quic Client Running...");await Task.Delay(3000);// 连接到服务端var connection = await QuicConnection.ConnectAsync(new QuicClientConnectionOptions{DefaultCloseErrorCode = 0,DefaultStreamErrorCode = 0,RemoteEndPoint = new IPEndPoint(IPAddress.Loopback, 9999),ClientAuthenticationOptions = new SslClientAuthenticationOptions{ApplicationProtocols = new List<SslApplicationProtocol> { SslApplicationProtocol.Http3 },RemoteCertificateValidationCallback = (sender, certificate, chain, errors) =>{return true;}}});创建一个出站的双向流 。
// 打开一个出站的双向流var stream = await connection.OpenOutboundStreamAsync(QuicStreamType.Bidirectional); var reader = PipeReader.Create(stream);var writer = PipeWriter.Create(stream);后台读取流数据,然后循环写入数据 。
// 后台读取流数据_ = ProcessLinesAsync(stream);Console.WriteLine(); // 写入数据for (int i = 0; i < 7; i++){await Task.Delay(2000);var message = $"Hello Quic {i} \n";Console.Write("Send -> " + message);await writer.WriteAsync(Encoding.UTF8.GetBytes(message));}await writer.CompleteAsync(); Console.ReadKey();ProcessLinesAsync 和服务端一样,使用System.IO.Pipeline读取流数据 。
async Task ProcessLinesAsync(QuicStream stream){while (true){ReadResult result = await reader.ReadAsync();ReadOnlySequence<byte> buffer = result.Buffer;while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line)){// 处理行数据ProcessLine(line);}reader.AdvanceTo(buffer.Start, buffer.End);if (result.IsCompleted){break;}}await reader.CompleteAsync();await writer.CompleteAsync();} bool TryReadLine(ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line){SequencePosition? position = buffer.PositionOf((byte)'\n');if (position == null){line = default;return false;}line = buffer.Slice(0, position.Value);buffer = buffer.Slice(buffer.GetPosition(1, position.Value));return true;}void ProcessLine(in ReadOnlySequence<byte> buffer){foreach (var segment in buffer){Console.Write("Recevied -> " + System.Text.Encoding.UTF8.GetString(segment.Span));Console.WriteLine();}Console.WriteLine();}

经验总结扩展阅读