Make some methods async

pull/847/head
Bond_009 6 years ago
parent f1ef0b0b4c
commit 2fc97212a7

@ -4,6 +4,7 @@ using System.IO;
using System.IO.Compression; using System.IO.Compression;
using System.Net; using System.Net;
using System.Text; using System.Text;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using MediaBrowser.Model.Services; using MediaBrowser.Model.Services;
using HttpStatusCode = SocketHttpListener.Net.HttpStatusCode; using HttpStatusCode = SocketHttpListener.Net.HttpStatusCode;
@ -95,8 +96,30 @@ namespace SocketHttpListener
: buffer; : buffer;
} }
private static bool readBytes( private static async Task<byte[]> ReadBytesAsync(this Stream stream, byte[] buffer, int offset, int length)
this Stream stream, byte[] buffer, int offset, int length, Stream dest) {
var len = await stream.ReadAsync(buffer, offset, length).ConfigureAwait(false);
if (len < 1)
return buffer.SubArray(0, offset);
var tmp = 0;
while (len < length)
{
tmp = await stream.ReadAsync(buffer, offset + len, length - len).ConfigureAwait(false);
if (tmp < 1)
{
break;
}
len += tmp;
}
return len < length
? buffer.SubArray(0, offset + len)
: buffer;
}
private static bool readBytes(this Stream stream, byte[] buffer, int offset, int length, Stream dest)
{ {
var bytes = stream.readBytes(buffer, offset, length); var bytes = stream.readBytes(buffer, offset, length);
var len = bytes.Length; var len = bytes.Length;
@ -105,6 +128,15 @@ namespace SocketHttpListener
return len == offset + length; return len == offset + length;
} }
private static async Task<bool> ReadBytesAsync(this Stream stream, byte[] buffer, int offset, int length, Stream dest)
{
var bytes = await stream.ReadBytesAsync(buffer, offset, length).ConfigureAwait(false);
var len = bytes.Length;
dest.Write(bytes, 0, len);
return len == offset + length;
}
#endregion #endregion
#region Internal Methods #region Internal Methods
@ -331,12 +363,10 @@ namespace SocketHttpListener
: string.Format("\"{0}\"", value.Replace("\"", "\\\"")); : string.Format("\"{0}\"", value.Replace("\"", "\\\""));
} }
internal static byte[] ReadBytes(this Stream stream, int length) internal static Task<byte[]> ReadBytesAsync(this Stream stream, int length)
{ => stream.ReadBytesAsync(new byte[length], 0, length);
return stream.readBytes(new byte[length], 0, length);
}
internal static byte[] ReadBytes(this Stream stream, long length, int bufferLength) internal static async Task<byte[]> ReadBytesAsync(this Stream stream, long length, int bufferLength)
{ {
using (var result = new MemoryStream()) using (var result = new MemoryStream())
{ {
@ -347,7 +377,7 @@ namespace SocketHttpListener
var end = false; var end = false;
for (long i = 0; i < count; i++) for (long i = 0; i < count; i++)
{ {
if (!stream.readBytes(buffer, 0, bufferLength, result)) if (!await stream.ReadBytesAsync(buffer, 0, bufferLength, result).ConfigureAwait(false))
{ {
end = true; end = true;
break; break;
@ -355,26 +385,14 @@ namespace SocketHttpListener
} }
if (!end && rem > 0) if (!end && rem > 0)
stream.readBytes(new byte[rem], 0, rem, result); {
await stream.ReadBytesAsync(new byte[rem], 0, rem, result).ConfigureAwait(false);
}
return result.ToArray(); return result.ToArray();
} }
} }
internal static async Task<byte[]> ReadBytesAsync(this Stream stream, int length)
{
var buffer = new byte[length];
var len = await stream.ReadAsync(buffer, 0, length).ConfigureAwait(false);
var bytes = len < 1
? new byte[0]
: len < length
? stream.readBytes(buffer, len, length - len)
: buffer;
return bytes;
}
internal static string RemovePrefix(this string value, params string[] prefixes) internal static string RemovePrefix(this string value, params string[] prefixes)
{ {
var i = 0; var i = 0;
@ -493,19 +511,16 @@ namespace SocketHttpListener
return string.Format("{0}; {1}", m, parameters.ToString("; ")); return string.Format("{0}; {1}", m, parameters.ToString("; "));
} }
internal static List<TSource> ToList<TSource>(this IEnumerable<TSource> source)
{
return new List<TSource>(source);
}
internal static ushort ToUInt16(this byte[] src, ByteOrder srcOrder) internal static ushort ToUInt16(this byte[] src, ByteOrder srcOrder)
{ {
return BitConverter.ToUInt16(src.ToHostOrder(srcOrder), 0); src.ToHostOrder(srcOrder);
return BitConverter.ToUInt16(src, 0);
} }
internal static ulong ToUInt64(this byte[] src, ByteOrder srcOrder) internal static ulong ToUInt64(this byte[] src, ByteOrder srcOrder)
{ {
return BitConverter.ToUInt64(src.ToHostOrder(srcOrder), 0); src.ToHostOrder(srcOrder);
return BitConverter.ToUInt64(src, 0);
} }
internal static string TrimEndSlash(this string value) internal static string TrimEndSlash(this string value)
@ -852,14 +867,17 @@ namespace SocketHttpListener
/// <exception cref="ArgumentNullException"> /// <exception cref="ArgumentNullException">
/// <paramref name="src"/> is <see langword="null"/>. /// <paramref name="src"/> is <see langword="null"/>.
/// </exception> /// </exception>
public static byte[] ToHostOrder(this byte[] src, ByteOrder srcOrder) public static void ToHostOrder(this byte[] src, ByteOrder srcOrder)
{ {
if (src == null) if (src == null)
{
throw new ArgumentNullException(nameof(src)); throw new ArgumentNullException(nameof(src));
}
return src.Length > 1 && !srcOrder.IsHostOrder() if (src.Length > 1 && !srcOrder.IsHostOrder())
? src.Reverse() {
: src; Array.Reverse(src);
}
} }
/// <summary> /// <summary>

@ -189,11 +189,11 @@ namespace SocketHttpListener
_context = null; _context = null;
} }
private bool concatenateFragmentsInto(Stream dest) private async Task<bool> ConcatenateFragmentsIntoAsync(Stream dest)
{ {
while (true) while (true)
{ {
var frame = WebSocketFrame.Read(_stream, true); var frame = await WebSocketFrame.ReadAsync(_stream, true).ConfigureAwait(false);
if (frame.IsFinal) if (frame.IsFinal)
{ {
/* FINAL */ /* FINAL */
@ -370,20 +370,22 @@ namespace SocketHttpListener
close(code, reason ?? code.GetMessage(), false); close(code, reason ?? code.GetMessage(), false);
} }
private bool processFragmentedFrame(WebSocketFrame frame) private Task<bool> ProcessFragmentedFrameAsync(WebSocketFrame frame)
{ {
return frame.IsContinuation // Not first fragment return frame.IsContinuation // Not first fragment
? true ? Task.FromResult(true)
: processFragments(frame); : ProcessFragmentsAsync(frame);
} }
private bool processFragments(WebSocketFrame first) private async Task<bool> ProcessFragmentsAsync(WebSocketFrame first)
{ {
using (var buff = new MemoryStream()) using (var buff = new MemoryStream())
{ {
buff.WriteBytes(first.PayloadData.ApplicationData); buff.WriteBytes(first.PayloadData.ApplicationData);
if (!concatenateFragmentsInto(buff)) if (!await ConcatenateFragmentsIntoAsync(buff).ConfigureAwait(false))
{
return false; return false;
}
byte[] data; byte[] data;
if (_compression != CompressionMethod.None) if (_compression != CompressionMethod.None)
@ -419,7 +421,7 @@ namespace SocketHttpListener
return false; return false;
} }
private bool processWebSocketFrame(WebSocketFrame frame) private async Task<bool> ProcessWebSocketFrameAsync(WebSocketFrame frame)
{ {
return frame.IsCompressed && _compression == CompressionMethod.None return frame.IsCompressed && _compression == CompressionMethod.None
? processUnsupportedFrame( ? processUnsupportedFrame(
@ -427,7 +429,7 @@ namespace SocketHttpListener
CloseStatusCode.IncorrectData, CloseStatusCode.IncorrectData,
"A compressed data has been received without available decompression method.") "A compressed data has been received without available decompression method.")
: frame.IsFragmented : frame.IsFragmented
? processFragmentedFrame(frame) ? await ProcessFragmentedFrameAsync(frame).ConfigureAwait(false)
: frame.IsData : frame.IsData
? processDataFrame(frame) ? processDataFrame(frame)
: frame.IsPing : frame.IsPing
@ -563,44 +565,46 @@ namespace SocketHttpListener
private void startReceiving() private void startReceiving()
{ {
if (_messageEventQueue.Count > 0) if (_messageEventQueue.Count > 0)
{
_messageEventQueue.Clear(); _messageEventQueue.Clear();
}
_exitReceiving = new AutoResetEvent(false); _exitReceiving = new AutoResetEvent(false);
_receivePong = new AutoResetEvent(false); _receivePong = new AutoResetEvent(false);
Action receive = null; Action receive = null;
receive = () => WebSocketFrame.ReadAsync( receive = async () => await WebSocketFrame.ReadAsync(
_stream, _stream,
true, true,
frame => async frame =>
{ {
if (processWebSocketFrame(frame) && _readyState != WebSocketState.Closed) if (await ProcessWebSocketFrameAsync(frame).ConfigureAwait(false) && _readyState != WebSocketState.Closed)
{ {
receive(); receive();
if (!frame.IsData) if (!frame.IsData)
return; return;
lock (_forEvent) lock (_forEvent)
{ {
try try
{ {
var e = dequeueFromMessageEventQueue(); var e = dequeueFromMessageEventQueue();
if (e != null && _readyState == WebSocketState.Open) if (e != null && _readyState == WebSocketState.Open)
OnMessage.Emit(this, e); OnMessage.Emit(this, e);
} }
catch (Exception ex) catch (Exception ex)
{ {
processException(ex, "An exception has occurred while OnMessage."); processException(ex, "An exception has occurred while OnMessage.");
} }
} }
} }
else if (_exitReceiving != null) else if (_exitReceiving != null)
{ {
_exitReceiving.Set(); _exitReceiving.Set();
} }
}, },
ex => processException(ex, "An exception has occurred while receiving a message.")); ex => processException(ex, "An exception has occurred while receiving a message."));
receive(); receive();
} }

@ -2,6 +2,7 @@ using System;
using System.Collections; using System.Collections;
using System.Collections.Generic; using System.Collections.Generic;
using System.IO; using System.IO;
using System.Threading.Tasks;
namespace SocketHttpListener namespace SocketHttpListener
{ {
@ -177,7 +178,7 @@ namespace SocketHttpListener
return opcode == Opcode.Text || opcode == Opcode.Binary; return opcode == Opcode.Text || opcode == Opcode.Binary;
} }
private static WebSocketFrame read(byte[] header, Stream stream, bool unmask) private static async Task<WebSocketFrame> ReadAsync(byte[] header, Stream stream, bool unmask)
{ {
/* Header */ /* Header */
@ -229,7 +230,7 @@ namespace SocketHttpListener
? 2 ? 2
: 8; : 8;
var extPayloadLen = size > 0 ? stream.ReadBytes(size) : new byte[0]; var extPayloadLen = size > 0 ? await stream.ReadBytesAsync(size).ConfigureAwait(false) : Array.Empty<byte>();
if (size > 0 && extPayloadLen.Length != size) if (size > 0 && extPayloadLen.Length != size)
throw new WebSocketException( throw new WebSocketException(
"The 'Extended Payload Length' of a frame cannot be read from the data source."); "The 'Extended Payload Length' of a frame cannot be read from the data source.");
@ -239,7 +240,7 @@ namespace SocketHttpListener
/* Masking Key */ /* Masking Key */
var masked = mask == Mask.Mask; var masked = mask == Mask.Mask;
var maskingKey = masked ? stream.ReadBytes(4) : new byte[0]; var maskingKey = masked ? await stream.ReadBytesAsync(4).ConfigureAwait(false) : Array.Empty<byte>();
if (masked && maskingKey.Length != 4) if (masked && maskingKey.Length != 4)
throw new WebSocketException( throw new WebSocketException(
"The 'Masking Key' of a frame cannot be read from the data source."); "The 'Masking Key' of a frame cannot be read from the data source.");
@ -264,8 +265,8 @@ namespace SocketHttpListener
"The length of 'Payload Data' of a frame is greater than the allowable length."); "The length of 'Payload Data' of a frame is greater than the allowable length.");
data = payloadLen > 126 data = payloadLen > 126
? stream.ReadBytes((long)len, 1024) ? await stream.ReadBytesAsync((long)len, 1024).ConfigureAwait(false)
: stream.ReadBytes((int)len); : await stream.ReadBytesAsync((int)len).ConfigureAwait(false);
//if (data.LongLength != (long)len) //if (data.LongLength != (long)len)
// throw new WebSocketException( // throw new WebSocketException(
@ -273,7 +274,7 @@ namespace SocketHttpListener
} }
else else
{ {
data = new byte[0]; data = Array.Empty<byte>();
} }
var payload = new PayloadData(data, masked); var payload = new PayloadData(data, masked);
@ -281,7 +282,7 @@ namespace SocketHttpListener
{ {
payload.Mask(maskingKey); payload.Mask(maskingKey);
frame._mask = Mask.Unmask; frame._mask = Mask.Unmask;
frame._maskingKey = new byte[0]; frame._maskingKey = Array.Empty<byte>();
} }
frame._payloadData = payload; frame._payloadData = payload;
@ -329,41 +330,39 @@ namespace SocketHttpListener
return new WebSocketFrame(fin, opcode, mask, new PayloadData(data), compressed); return new WebSocketFrame(fin, opcode, mask, new PayloadData(data), compressed);
} }
internal static WebSocketFrame Read(Stream stream) internal static Task<WebSocketFrame> ReadAsync(Stream stream)
{ => ReadAsync(stream, true);
return Read(stream, true);
}
internal static WebSocketFrame Read(Stream stream, bool unmask) internal static async Task<WebSocketFrame> ReadAsync(Stream stream, bool unmask)
{ {
var header = stream.ReadBytes(2); var header = await stream.ReadBytesAsync(2).ConfigureAwait(false);
if (header.Length != 2) if (header.Length != 2)
{
throw new WebSocketException( throw new WebSocketException(
"The header part of a frame cannot be read from the data source."); "The header part of a frame cannot be read from the data source.");
}
return read(header, stream, unmask); return await ReadAsync(header, stream, unmask).ConfigureAwait(false);
} }
internal static async void ReadAsync( internal static async Task ReadAsync(
Stream stream, bool unmask, Action<WebSocketFrame> completed, Action<Exception> error) Stream stream, bool unmask, Action<WebSocketFrame> completed, Action<Exception> error)
{ {
try try
{ {
var header = await stream.ReadBytesAsync(2).ConfigureAwait(false); var header = await stream.ReadBytesAsync(2).ConfigureAwait(false);
if (header.Length != 2) if (header.Length != 2)
{
throw new WebSocketException( throw new WebSocketException(
"The header part of a frame cannot be read from the data source."); "The header part of a frame cannot be read from the data source.");
}
var frame = read(header, stream, unmask); var frame = await ReadAsync(header, stream, unmask).ConfigureAwait(false);
if (completed != null) completed?.Invoke(frame);
completed(frame);
} }
catch (Exception ex) catch (Exception ex)
{ {
if (error != null) error.Invoke(ex);
{
error(ex);
}
} }
} }

Loading…
Cancel
Save