From 1cad93c276fef512d6d4731d32704e05433aed01 Mon Sep 17 00:00:00 2001 From: Bond_009 Date: Mon, 29 Jul 2019 15:26:17 +0200 Subject: [PATCH] Use System.Net abstractions instead of raw socket --- .../TunerHosts/HdHomerun/HdHomerunHost.cs | 2 +- .../TunerHosts/HdHomerun/HdHomerunManager.cs | 207 ++++++++++-------- .../HdHomerun/HdHomerunUdpStream.cs | 139 +----------- .../Net/SocketFactory.cs | 42 ---- MediaBrowser.Model/Net/ISocketFactory.cs | 12 - 5 files changed, 127 insertions(+), 275 deletions(-) diff --git a/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHost.cs b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHost.cs index ed524cae30..42acfbc5cb 100644 --- a/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHost.cs +++ b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHost.cs @@ -255,7 +255,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun var uri = new Uri(GetApiUrl(info)); - using (var manager = new HdHomerunManager(_socketFactory, Logger)) + using (var manager = new HdHomerunManager(Logger)) { // Legacy HdHomeruns are IPv4 only var ipInfo = IPAddress.Parse(uri.Host); diff --git a/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunManager.cs b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunManager.cs index 6e79441dab..a6538be782 100644 --- a/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunManager.cs +++ b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunManager.cs @@ -1,12 +1,13 @@ using System; +using System.Buffers; using System.Collections.Generic; using System.Net; +using System.Net.Sockets; using System.Text; using System.Text.RegularExpressions; using System.Threading; using System.Threading.Tasks; using MediaBrowser.Controller.LiveTv; -using MediaBrowser.Model.Net; using Microsoft.Extensions.Logging; namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun @@ -78,37 +79,36 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun public class HdHomerunManager : IDisposable { - public static int HdHomeRunPort = 65001; + public const int HdHomeRunPort = 65001; // Message constants - private static byte GetSetName = 3; - private static byte GetSetValue = 4; - private static byte GetSetLockkey = 21; - private static ushort GetSetRequest = 4; - private static ushort GetSetReply = 5; + private const byte GetSetName = 3; + private const byte GetSetValue = 4; + private const byte GetSetLockkey = 21; + private const ushort GetSetRequest = 4; + private const ushort GetSetReply = 5; + + private readonly ILogger _logger; private uint? _lockkey = null; private int _activeTuner = -1; - private readonly ISocketFactory _socketFactory; - private IPAddress _remoteIp; + private IPEndPoint _remoteEndPoint; - private ILogger _logger; - private ISocket _currentTcpSocket; + private TcpClient _tcpClient; - public HdHomerunManager(ISocketFactory socketFactory, ILogger logger) + public HdHomerunManager(ILogger logger) { - _socketFactory = socketFactory; _logger = logger; } public void Dispose() { - using (var socket = _currentTcpSocket) + using (var socket = _tcpClient) { if (socket != null) { - _currentTcpSocket = null; + _tcpClient = null; - var task = StopStreaming(socket); + var task = StopStreaming(_tcpClient); Task.WaitAll(task); } } @@ -116,35 +116,38 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun public async Task CheckTunerAvailability(IPAddress remoteIp, int tuner, CancellationToken cancellationToken) { - using (var socket = _socketFactory.CreateTcpSocket(remoteIp, HdHomeRunPort)) + using (var client = new TcpClient(new IPEndPoint(remoteIp, HdHomeRunPort))) + using (var stream = client.GetStream()) { - return await CheckTunerAvailability(socket, remoteIp, tuner, cancellationToken).ConfigureAwait(false); + return await CheckTunerAvailability(stream, tuner, cancellationToken).ConfigureAwait(false); } } - private static async Task CheckTunerAvailability(ISocket socket, IPAddress remoteIp, int tuner, CancellationToken cancellationToken) + private static async Task CheckTunerAvailability(NetworkStream stream, int tuner, CancellationToken cancellationToken) { - var ipEndPoint = new IPEndPoint(remoteIp, HdHomeRunPort); - var lockkeyMsg = CreateGetMessage(tuner, "lockkey"); - await socket.SendToAsync(lockkeyMsg, 0, lockkeyMsg.Length, ipEndPoint, cancellationToken); + await stream.WriteAsync(lockkeyMsg, 0, lockkeyMsg.Length, cancellationToken).ConfigureAwait(false); - var receiveBuffer = new byte[8192]; - var response = await socket.ReceiveAsync(receiveBuffer, 0, receiveBuffer.Length, cancellationToken).ConfigureAwait(false); + byte[] buffer = ArrayPool.Shared.Rent(8192); + try + { + int receivedBytes = await stream.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false); - ParseReturnMessage(response.Buffer, response.ReceivedBytes, out string returnVal); + ParseReturnMessage(buffer, receivedBytes, out string returnVal); - return string.Equals(returnVal, "none", StringComparison.OrdinalIgnoreCase); + return string.Equals(returnVal, "none", StringComparison.OrdinalIgnoreCase); + } + finally + { + ArrayPool.Shared.Return(buffer); + } } public async Task StartStreaming(IPAddress remoteIp, IPAddress localIp, int localPort, IHdHomerunChannelCommands commands, int numTuners, CancellationToken cancellationToken) { - _remoteIp = remoteIp; + _remoteEndPoint = new IPEndPoint(remoteIp, HdHomeRunPort); - var tcpClient = _socketFactory.CreateTcpSocket(_remoteIp, HdHomeRunPort); - _currentTcpSocket = tcpClient; - - var receiveBuffer = new byte[8192]; + _tcpClient = new TcpClient(_remoteEndPoint); if (!_lockkey.HasValue) { @@ -153,51 +156,61 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun } var lockKeyValue = _lockkey.Value; + var stream = _tcpClient.GetStream(); - var ipEndPoint = new IPEndPoint(_remoteIp, HdHomeRunPort); - - for (int i = 0; i < numTuners; ++i) + byte[] buffer = ArrayPool.Shared.Rent(8192); + try { - if (!await CheckTunerAvailability(tcpClient, _remoteIp, i, cancellationToken).ConfigureAwait(false)) - continue; - - _activeTuner = i; - var lockKeyString = string.Format("{0:d}", lockKeyValue); - var lockkeyMsg = CreateSetMessage(i, "lockkey", lockKeyString, null); - await tcpClient.SendToAsync(lockkeyMsg, 0, lockkeyMsg.Length, ipEndPoint, cancellationToken).ConfigureAwait(false); - var response = await tcpClient.ReceiveAsync(receiveBuffer, 0, receiveBuffer.Length, cancellationToken).ConfigureAwait(false); - // parse response to make sure it worked - if (!ParseReturnMessage(response.Buffer, response.ReceivedBytes, out var returnVal)) - continue; - - var commandList = commands.GetCommands(); - foreach (Tuple command in commandList) + for (int i = 0; i < numTuners; ++i) { - var channelMsg = CreateSetMessage(i, command.Item1, command.Item2, lockKeyValue); - await tcpClient.SendToAsync(channelMsg, 0, channelMsg.Length, ipEndPoint, cancellationToken).ConfigureAwait(false); - response = await tcpClient.ReceiveAsync(receiveBuffer, 0, receiveBuffer.Length, cancellationToken).ConfigureAwait(false); + if (!await CheckTunerAvailability(stream, i, cancellationToken).ConfigureAwait(false)) + { + continue; + } + + _activeTuner = i; + var lockKeyString = string.Format("{0:d}", lockKeyValue); + var lockkeyMsg = CreateSetMessage(i, "lockkey", lockKeyString, null); + await stream.WriteAsync(lockkeyMsg, 0, lockkeyMsg.Length, cancellationToken).ConfigureAwait(false); + int receivedBytes = await stream.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false); // parse response to make sure it worked - if (!ParseReturnMessage(response.Buffer, response.ReceivedBytes, out returnVal)) + if (!ParseReturnMessage(buffer, receivedBytes, out var returnVal)) { - await ReleaseLockkey(tcpClient, lockKeyValue).ConfigureAwait(false); continue; } - } + var commandList = commands.GetCommands(); + foreach (Tuple command in commandList) + { + var channelMsg = CreateSetMessage(i, command.Item1, command.Item2, lockKeyValue); + await stream.WriteAsync(channelMsg, 0, channelMsg.Length, cancellationToken).ConfigureAwait(false); + receivedBytes = await stream.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false); + // parse response to make sure it worked + if (!ParseReturnMessage(buffer, receivedBytes, out returnVal)) + { + await ReleaseLockkey(_tcpClient, lockKeyValue).ConfigureAwait(false); + continue; + } + } - var targetValue = string.Format("rtp://{0}:{1}", localIp, localPort); - var targetMsg = CreateSetMessage(i, "target", targetValue, lockKeyValue); + var targetValue = string.Format("rtp://{0}:{1}", localIp, localPort); + var targetMsg = CreateSetMessage(i, "target", targetValue, lockKeyValue); - await tcpClient.SendToAsync(targetMsg, 0, targetMsg.Length, ipEndPoint, cancellationToken).ConfigureAwait(false); - response = await tcpClient.ReceiveAsync(receiveBuffer, 0, receiveBuffer.Length, cancellationToken).ConfigureAwait(false); - // parse response to make sure it worked - if (!ParseReturnMessage(response.Buffer, response.ReceivedBytes, out returnVal)) - { - await ReleaseLockkey(tcpClient, lockKeyValue).ConfigureAwait(false); - continue; - } + await stream.WriteAsync(targetMsg, 0, targetMsg.Length, cancellationToken).ConfigureAwait(false); + receivedBytes = await stream.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false); + // parse response to make sure it worked + if (!ParseReturnMessage(buffer, receivedBytes, out returnVal)) + { + await ReleaseLockkey(_tcpClient, lockKeyValue).ConfigureAwait(false); + continue; + } - return; + return; + } + } + finally + { + ArrayPool.Shared.Return(buffer); } _activeTuner = -1; @@ -207,53 +220,70 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun public async Task ChangeChannel(IHdHomerunChannelCommands commands, CancellationToken cancellationToken) { if (!_lockkey.HasValue) + { return; + } - using (var tcpClient = _socketFactory.CreateTcpSocket(_remoteIp, HdHomeRunPort)) + using (var tcpClient = new TcpClient(_remoteEndPoint)) + using (var stream = tcpClient.GetStream()) { var commandList = commands.GetCommands(); - var receiveBuffer = new byte[8192]; - - foreach (Tuple command in commandList) + byte[] buffer = ArrayPool.Shared.Rent(8192); + try { - var channelMsg = CreateSetMessage(_activeTuner, command.Item1, command.Item2, _lockkey); - await tcpClient.SendToAsync(channelMsg, 0, channelMsg.Length, new IPEndPoint(_remoteIp, HdHomeRunPort), cancellationToken).ConfigureAwait(false); - var response = await tcpClient.ReceiveAsync(receiveBuffer, 0, receiveBuffer.Length, cancellationToken).ConfigureAwait(false); - // parse response to make sure it worked - if (!ParseReturnMessage(response.Buffer, response.ReceivedBytes, out string returnVal)) + foreach (Tuple command in commandList) { - return; + var channelMsg = CreateSetMessage(_activeTuner, command.Item1, command.Item2, _lockkey); + await stream.WriteAsync(channelMsg, 0, channelMsg.Length, cancellationToken).ConfigureAwait(false); + int receivedBytes = await stream.ReadAsync(buffer, 0, buffer.Length, cancellationToken).ConfigureAwait(false); + // parse response to make sure it worked + if (!ParseReturnMessage(buffer, receivedBytes, out string returnVal)) + { + return; + } } } + finally + { + ArrayPool.Shared.Return(buffer); + } } } - public Task StopStreaming(ISocket socket) + public Task StopStreaming(TcpClient client) { var lockKey = _lockkey; if (!lockKey.HasValue) + { return Task.CompletedTask; + } - return ReleaseLockkey(socket, lockKey.Value); + return ReleaseLockkey(client, lockKey.Value); } - private async Task ReleaseLockkey(ISocket tcpClient, uint lockKeyValue) + private async Task ReleaseLockkey(TcpClient client, uint lockKeyValue) { _logger.LogInformation("HdHomerunManager.ReleaseLockkey {0}", lockKeyValue); - var ipEndPoint = new IPEndPoint(_remoteIp, HdHomeRunPort); + var stream = client.GetStream(); var releaseTarget = CreateSetMessage(_activeTuner, "target", "none", lockKeyValue); - await tcpClient.SendToAsync(releaseTarget, 0, releaseTarget.Length, ipEndPoint, CancellationToken.None).ConfigureAwait(false); - - var receiveBuffer = new byte[8192]; + await stream.WriteAsync(releaseTarget, 0, releaseTarget.Length, CancellationToken.None).ConfigureAwait(false); - await tcpClient.ReceiveAsync(receiveBuffer, 0, receiveBuffer.Length, CancellationToken.None).ConfigureAwait(false); - var releaseKeyMsg = CreateSetMessage(_activeTuner, "lockkey", "none", lockKeyValue); - _lockkey = null; - await tcpClient.SendToAsync(releaseKeyMsg, 0, releaseKeyMsg.Length, ipEndPoint, CancellationToken.None).ConfigureAwait(false); - await tcpClient.ReceiveAsync(receiveBuffer, 0, receiveBuffer.Length, CancellationToken.None).ConfigureAwait(false); + var buffer = ArrayPool.Shared.Rent(8192); + try + { + await stream.ReadAsync(buffer, 0, buffer.Length, CancellationToken.None).ConfigureAwait(false); + var releaseKeyMsg = CreateSetMessage(_activeTuner, "lockkey", "none", lockKeyValue); + _lockkey = null; + await stream.WriteAsync(releaseKeyMsg, 0, releaseKeyMsg.Length, CancellationToken.None).ConfigureAwait(false); + await stream.ReadAsync(buffer, 0, buffer.Length, CancellationToken.None).ConfigureAwait(false); + } + finally + { + ArrayPool.Shared.Return(buffer); + } } private static byte[] CreateGetMessage(int tuner, string name) @@ -270,7 +300,10 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun // calculate crc and insert at the end of the message var crcBytes = BitConverter.GetBytes(HdHomerunCrc.GetCrc32(message, messageLength - 4)); if (flipEndian) + { Array.Reverse(crcBytes); + } + Buffer.BlockCopy(crcBytes, 0, message, offset, 4); return message; diff --git a/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunUdpStream.cs b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunUdpStream.cs index ec708cf20f..1d79a5f96e 100644 --- a/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunUdpStream.cs +++ b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunUdpStream.cs @@ -49,13 +49,6 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun EnableStreamSharing = true; } - private static Socket CreateSocket(AddressFamily addressFamily, SocketType socketType, ProtocolType protocolType) - { - var socket = new Socket(addressFamily, SocketType.Stream, ProtocolType.Tcp); - - return socket; - } - public override async Task Open(CancellationToken openCancellationToken) { LiveStreamCancellationTokenSource.Token.ThrowIfCancellationRequested(); @@ -71,13 +64,13 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun var remoteAddress = IPAddress.Parse(uri.Host); IPAddress localAddress = null; - using (var tcpSocket = CreateSocket(remoteAddress.AddressFamily, SocketType.Stream, ProtocolType.Tcp)) + using (var tcpClient = new TcpClient()) { try { - tcpSocket.Connect(new IPEndPoint(remoteAddress, HdHomerunManager.HdHomeRunPort)); - localAddress = ((IPEndPoint)tcpSocket.LocalEndPoint).Address; - tcpSocket.Close(); + await tcpClient.ConnectAsync(remoteAddress, HdHomerunManager.HdHomeRunPort).ConfigureAwait(false); + localAddress = ((IPEndPoint)tcpClient.Client.RemoteEndPoint).Address; + tcpClient.Close(); } catch (Exception ex) { @@ -87,7 +80,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun } var udpClient = _socketFactory.CreateUdpSocket(localPort); - var hdHomerunManager = new HdHomerunManager(_socketFactory, Logger); + var hdHomerunManager = new HdHomerunManager(Logger); try { @@ -103,6 +96,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun { Logger.LogError(ex, "Error opening live stream:"); } + throw; } } @@ -199,126 +193,5 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun } } } - - public class UdpClientStream : Stream - { - private static int RtpHeaderBytes = 12; - private static int PacketSize = 1316; - private readonly MediaBrowser.Model.Net.ISocket _udpClient; - bool disposed; - - public UdpClientStream(MediaBrowser.Model.Net.ISocket udpClient) : base() - { - _udpClient = udpClient; - } - - public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) - { - if (buffer == null) - throw new ArgumentNullException(nameof(buffer)); - - if (offset + count < 0) - throw new ArgumentOutOfRangeException(nameof(offset), "offset + count must not be negative"); - - if (offset + count > buffer.Length) - throw new ArgumentException("offset + count must not be greater than the length of buffer"); - - if (disposed) - throw new ObjectDisposedException(nameof(UdpClientStream)); - - // This will always receive a 1328 packet size (PacketSize + RtpHeaderSize) - // The RTP header will be stripped so see how many reads we need to make to fill the buffer. - int numReads = count / PacketSize; - int totalBytesRead = 0; - byte[] receiveBuffer = new byte[81920]; - - for (int i = 0; i < numReads; ++i) - { - var data = await _udpClient.ReceiveAsync(receiveBuffer, 0, receiveBuffer.Length, cancellationToken).ConfigureAwait(false); - - var bytesRead = data.ReceivedBytes - RtpHeaderBytes; - - // remove rtp header - Buffer.BlockCopy(data.Buffer, RtpHeaderBytes, buffer, offset, bytesRead); - offset += bytesRead; - totalBytesRead += bytesRead; - } - return totalBytesRead; - } - - public override int Read(byte[] buffer, int offset, int count) - { - if (buffer == null) - throw new ArgumentNullException(nameof(buffer)); - - if (offset + count < 0) - throw new ArgumentOutOfRangeException("offset + count must not be negative", "offset+count"); - - if (offset + count > buffer.Length) - throw new ArgumentException("offset + count must not be greater than the length of buffer"); - - if (disposed) - throw new ObjectDisposedException(nameof(UdpClientStream)); - - // This will always receive a 1328 packet size (PacketSize + RtpHeaderSize) - // The RTP header will be stripped so see how many reads we need to make to fill the buffer. - int numReads = count / PacketSize; - int totalBytesRead = 0; - byte[] receiveBuffer = new byte[81920]; - - for (int i = 0; i < numReads; ++i) - { - var receivedBytes = _udpClient.Receive(receiveBuffer, 0, receiveBuffer.Length); - - var bytesRead = receivedBytes - RtpHeaderBytes; - - // remove rtp header - Buffer.BlockCopy(receiveBuffer, RtpHeaderBytes, buffer, offset, bytesRead); - offset += bytesRead; - totalBytesRead += bytesRead; - } - return totalBytesRead; - } - - protected override void Dispose(bool disposing) - { - disposed = true; - } - - public override bool CanRead => throw new NotImplementedException(); - - public override bool CanSeek => throw new NotImplementedException(); - - public override bool CanWrite => throw new NotImplementedException(); - - public override long Length => throw new NotImplementedException(); - - public override long Position - { - get => throw new NotImplementedException(); - - set => throw new NotImplementedException(); - } - - public override void Flush() - { - throw new NotImplementedException(); - } - - public override long Seek(long offset, SeekOrigin origin) - { - throw new NotImplementedException(); - } - - public override void SetLength(long value) - { - throw new NotImplementedException(); - } - - public override void Write(byte[] buffer, int offset, int count) - { - throw new NotImplementedException(); - } - } } } diff --git a/Emby.Server.Implementations/Net/SocketFactory.cs b/Emby.Server.Implementations/Net/SocketFactory.cs index 42ffa4e221..cb53ce50c7 100644 --- a/Emby.Server.Implementations/Net/SocketFactory.cs +++ b/Emby.Server.Implementations/Net/SocketFactory.cs @@ -2,54 +2,12 @@ using System; using System.IO; using System.Net; using System.Net.Sockets; -using Emby.Server.Implementations.Networking; using MediaBrowser.Model.Net; namespace Emby.Server.Implementations.Net { public class SocketFactory : ISocketFactory { - // THIS IS A LINKED FILE - SHARED AMONGST MULTIPLE PLATFORMS - // Be careful to check any changes compile and work for all platform projects it is shared in. - - // Not entirely happy with this. Would have liked to have done something more generic/reusable, - // but that wasn't really the point so kept to YAGNI principal for now, even if the - // interfaces are a bit ugly, specific and make assumptions. - - public ISocket CreateTcpSocket(IPAddress remoteAddress, int remotePort) - { - if (remotePort < 0) - { - throw new ArgumentException("remotePort cannot be less than zero.", nameof(remotePort)); - } - - var addressFamily = remoteAddress.AddressFamily == AddressFamily.InterNetwork - ? AddressFamily.InterNetwork - : AddressFamily.InterNetworkV6; - - var retVal = new Socket(addressFamily, System.Net.Sockets.SocketType.Stream, System.Net.Sockets.ProtocolType.Tcp); - - try - { - retVal.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true); - } - catch (SocketException) - { - // This is not supported on all operating systems (qnap) - } - - try - { - return new UdpSocket(retVal, new IPEndPoint(remoteAddress, remotePort)); - } - catch - { - retVal?.Dispose(); - - throw; - } - } - /// /// Creates a new UDP acceptSocket and binds it to the specified local port. /// diff --git a/MediaBrowser.Model/Net/ISocketFactory.cs b/MediaBrowser.Model/Net/ISocketFactory.cs index e58f4cc14d..2f857f1af0 100644 --- a/MediaBrowser.Model/Net/ISocketFactory.cs +++ b/MediaBrowser.Model/Net/ISocketFactory.cs @@ -17,8 +17,6 @@ namespace MediaBrowser.Model.Net ISocket CreateUdpBroadcastSocket(int localPort); - ISocket CreateTcpSocket(IPAddress remoteAddress, int remotePort); - /// /// Creates a new unicast socket using the specified local port number. /// @@ -35,14 +33,4 @@ namespace MediaBrowser.Model.Net Stream CreateNetworkStream(ISocket socket, bool ownsSocket); } - - public enum SocketType - { - Stream - } - - public enum ProtocolType - { - Tcp - } }