本文共 10857 字,大约阅读时间需要 36 分钟。
关于System.IO.Pipelines的一篇说明
System.IO.Pipelines 是对IO的统一抽象,文件、com口、网络等等,重点在于让调用者注意力集中在读、写缓冲区上,典型的就是 IDuplexPipe中的Input Output。
可以理解为将IO类抽象为读、写两个缓冲区。
目前官方实现还处于preview状态,作者使用Socket和NetworkStream 实现了一个
作者在前两篇中提到使用System.IO.Pipelines 改造StackExchange.Redis,在本篇中作者采用了改造现有的SimplSockets库来说明System.IO.Pipelines的使用。
( )
public interface IMemoryOwner: IDisposable { Memory Memory { get; } }
private sealed class ArrayPoolOwner:IMemoryOwner { private readonly int _length; private T[] _oversized; internal ArrayPoolOwner(T[] oversized,int length){ _length=length; _oversized=oversized; } public Memory Memory=>new Memory (GetArray(),0,_length); private T[] GetArray()=>Interlocked.CompareExchange(ref _oversized,null,null) ?? throw new ObjectDisposedException(ToString()); public void Dispose(){ var arr=Interlocked.Exchange(ref _oversized,null); if(arr!=null) ArrayPool .Shared.Return(arr); }}
void DoSomething(IMemoryOwnerdata){ using(data){ // ... other things here ... DoTheThing(data.Memory); } // ... more things here ... }
通过ArrayPool的借、还机制避免频繁分配。
public static IMemoryOwnerLease (this ReadOnlySequence source) { if (source.IsEmpty) return Empty (); int len = checked((int)source.Length); var arr = ArrayPool .Shared.Rent(len);//借出 source.CopyTo(arr); return new ArrayPoolOwner (arr, len);//dispose时归还 }
public abstract class SimplPipeline : IDisposable { private IDuplexPipe _pipe; protected SimplPipeline(IDuplexPipe pipe) => _pipe = pipe; public void Dispose() => Close(); public void Close() {/* burn the pipe*/} }
protected async ValueTask WriteAsync(IMemoryOwnerpayload, int messageId)//调用方不再使用payload,需要我们清理 { using (payload) { await WriteAsync(payload.Memory, messageId); } }protected ValueTask WriteAsync(ReadOnlyMemory payload, int messageId);//调用方自己清理
messageId标识一条消息,写入消息头部, 用于之后处理响应回复信息。 返回值使用ValueTask因为写入管道通常是同步的,只有管道执行Flush时才可能是异步的(大多数情况下也是同步的,除非在管道被备份时)。
private readonly SemaphoreSlim _singleWriter= new SemaphoreSlim(1);protected async ValueTask WriteAsync(ReadOnlyMemorypayload, int messageId){ await _singleWriter.WaitAsync(); try { WriteFrameHeader(writer, payload.Length, messageId); await writer.WriteAsync(payload); } finally { _singleWriter.Release(); }}
protected ValueTask WriteAsync(ReadOnlyMemorypayload, int messageId){ // try to get the conch; if not, switch to async//writer已经被占用,异步 if (!_singleWriter.Wait(0)) return WriteAsyncSlowPath(payload, messageId); bool release = true; try { WriteFrameHeader(writer, payload.Length, messageId); var write = writer.WriteAsync(payload); if (write.IsCompletedSuccessfully) return default; release = false; return AwaitFlushAndRelease(write); } finally { if (release) _singleWriter.Release(); }}async ValueTask AwaitFlushAndRelease(ValueTask flush){ try { await flush; } finally { _singleWriter.Release(); }}
三个地方 1. _singleWriter.Wait(0) 意味着writer处于空闲状态,没有其他人在调用 2. write.IsCompletedSuccessfully 意味着writer同步flush 3. 辅助方法 AwaitFlushAndRelease 处理异步flush情况 -------------------------------------------------------------------------------------
void WriteFrameHeader(PipeWriter writer, int length, int messageId){ var span = writer.GetSpan(8); BinaryPrimitives.WriteInt32LittleEndian( span, length); BinaryPrimitives.WriteInt32LittleEndian( span.Slice(4), messageId); writer.Advance(8);}
public class SimplPipelineClient : SimplPipeline{ public async Task> SendReceiveAsync(ReadOnlyMemory message) { var tcs = new TaskCompletionSource >(); int messageId; lock (_awaitingResponses) { messageId = ++_nextMessageId; if (messageId == 0) messageId = 1; _awaitingResponses.Add(messageId, tcs); } await WriteAsync(message, messageId); return await tcs.Task; } public async Task > SendReceiveAsync(IMemoryOwner message) { using (message) { return await SendReceiveAsync(message.Memory); } }}
- _awaitingResponses 是个字典,保存已经发送的消息,用于将来处理对某条(messageId)消息的回复。
protected async Task StartReceiveLoopAsync(CancellationToken cancellationToken = default){ try { while (!cancellationToken.IsCancellationRequested) { var readResult = await reader.ReadAsync(cancellationToken); if (readResult.IsCanceled) break; var buffer = readResult.Buffer; var makingProgress = false; while (TryParseFrame(ref buffer, out var payload, out var messageId)) { makingProgress = true; await OnReceiveAsync(payload, messageId); } reader.AdvanceTo(buffer.Start, buffer.End); if (!makingProgress && readResult.IsCompleted) break; } try { reader.Complete(); } catch { } } catch (Exception ex) { try { reader.Complete(ex); } catch { } }}protected abstract ValueTask OnReceiveAsync(ReadOnlySequencepayload, int messageId);
private bool TryParseFrame( ref ReadOnlySequenceinput, out ReadOnlySequence payload, out int messageId){ if (input.Length < 8) { // not enough data for the header payload = default; messageId = default; return false; } int length; if (input.First.Length >= 8) { // already 8 bytes in the first segment length = ParseFrameHeader( input.First.Span, out messageId); } else { // copy 8 bytes into a local span Span local = stackalloc byte[8]; input.Slice(0, 8).CopyTo(local); length = ParseFrameHeader( local, out messageId); } // do we have the "length" bytes? if (input.Length < length + 8) { payload = default; return false; } // success! payload = input.Slice(8, length); input = input.Slice(payload.End); return true;}
static int ParseFrameHeader( ReadOnlySpaninput, out int messageId){ var length = BinaryPrimitives .ReadInt32LittleEndian(input); messageId = BinaryPrimitives .ReadInt32LittleEndian(input.Slice(4)); return length;}
OnReceiveAsync
protected override ValueTask OnReceiveAsync( ReadOnlySequencepayload, int messageId){ if (messageId != 0) { // request/response TaskCompletionSource > tcs; lock (_awaitingResponses) { if (_awaitingResponses.TryGetValue(messageId, out tcs)) { _awaitingResponses.Remove(messageId); } } tcs?.TrySetResult(payload.Lease()); } else { // unsolicited MessageReceived?.Invoke(payload.Lease()); } return default;}
转载地址:http://fakki.baihongyu.com/