From 98e7eeeff933d6f5ba18daecb3931337523dc01b Mon Sep 17 00:00:00 2001 From: Luke Pulverenti Date: Thu, 5 Sep 2013 17:34:46 -0400 Subject: [PATCH] reduce byte conversions with alchemy web socket --- MediaBrowser.Common/Net/IWebSocket.cs | 12 +++-- .../Movies/MovieDbProvider.cs | 13 +++-- .../HttpServer/NativeWebSocket.cs | 8 +-- .../ServerManager/ServerManager.cs | 5 +- .../ServerManager/WebSocketConnection.cs | 31 ++++++++++- .../WebSocket/AlchemyServer.cs | 9 +++- .../WebSocket/AlchemyWebSocket.cs | 31 +++++------ .../WebSocket/FleckServer.cs | 54 +++++++++++++++++++ .../WebSocket/FleckWebSocket.cs | 47 ++++++++++++++++ 9 files changed, 175 insertions(+), 35 deletions(-) create mode 100644 MediaBrowser.Server.Implementations/WebSocket/FleckServer.cs create mode 100644 MediaBrowser.Server.Implementations/WebSocket/FleckWebSocket.cs diff --git a/MediaBrowser.Common/Net/IWebSocket.cs b/MediaBrowser.Common/Net/IWebSocket.cs index 805340b906..748c6642cb 100644 --- a/MediaBrowser.Common/Net/IWebSocket.cs +++ b/MediaBrowser.Common/Net/IWebSocket.cs @@ -1,7 +1,7 @@ -using System; +using MediaBrowser.Model.Net; +using System; using System.Threading; using System.Threading.Tasks; -using MediaBrowser.Model.Net; namespace MediaBrowser.Common.Net { @@ -20,8 +20,14 @@ namespace MediaBrowser.Common.Net /// Gets or sets the receive action. /// /// The receive action. - Action OnReceiveDelegate { get; set; } + Action OnReceiveBytes { get; set; } + /// + /// Gets or sets the on receive. + /// + /// The on receive. + Action OnReceive { get; set; } + /// /// Sends the async. /// diff --git a/MediaBrowser.Providers/Movies/MovieDbProvider.cs b/MediaBrowser.Providers/Movies/MovieDbProvider.cs index 69a23b84fb..e4468dfe47 100644 --- a/MediaBrowser.Providers/Movies/MovieDbProvider.cs +++ b/MediaBrowser.Providers/Movies/MovieDbProvider.cs @@ -151,15 +151,14 @@ namespace MediaBrowser.Providers.Movies await _tmdbSettingsSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); - // Check again in case it got populated while we were waiting. - if (_tmdbSettings != null) - { - _tmdbSettingsSemaphore.Release(); - return _tmdbSettings; - } - try { + // Check again in case it got populated while we were waiting. + if (_tmdbSettings != null) + { + return _tmdbSettings; + } + using (var json = await GetMovieDbResponse(new HttpRequestOptions { Url = string.Format(TmdbConfigUrl, ApiKey), diff --git a/MediaBrowser.Server.Implementations/HttpServer/NativeWebSocket.cs b/MediaBrowser.Server.Implementations/HttpServer/NativeWebSocket.cs index 9ad6178472..96f61912a3 100644 --- a/MediaBrowser.Server.Implementations/HttpServer/NativeWebSocket.cs +++ b/MediaBrowser.Server.Implementations/HttpServer/NativeWebSocket.cs @@ -88,9 +88,9 @@ namespace MediaBrowser.Server.Implementations.HttpServer break; } - if (OnReceiveDelegate != null) + if (OnReceiveBytes != null) { - OnReceiveDelegate(bytes); + OnReceiveBytes(bytes); } } } @@ -160,6 +160,8 @@ namespace MediaBrowser.Server.Implementations.HttpServer /// Gets or sets the receive action. /// /// The receive action. - public Action OnReceiveDelegate { get; set; } + public Action OnReceiveBytes { get; set; } + + public Action OnReceive { get; set; } } } diff --git a/MediaBrowser.Server.Implementations/ServerManager/ServerManager.cs b/MediaBrowser.Server.Implementations/ServerManager/ServerManager.cs index 80b6a0f7db..6cddcdf2ee 100644 --- a/MediaBrowser.Server.Implementations/ServerManager/ServerManager.cs +++ b/MediaBrowser.Server.Implementations/ServerManager/ServerManager.cs @@ -186,7 +186,10 @@ namespace MediaBrowser.Server.Implementations.ServerManager /// The instance containing the event data. void HttpServer_WebSocketConnected(object sender, WebSocketConnectEventArgs e) { - var connection = new WebSocketConnection(e.WebSocket, e.Endpoint, _jsonSerializer, _logger) { OnReceive = ProcessWebSocketMessageReceived }; + var connection = new WebSocketConnection(e.WebSocket, e.Endpoint, _jsonSerializer, _logger) + { + OnReceive = ProcessWebSocketMessageReceived + }; _webSocketConnections.Add(connection); } diff --git a/MediaBrowser.Server.Implementations/ServerManager/WebSocketConnection.cs b/MediaBrowser.Server.Implementations/ServerManager/WebSocketConnection.cs index 0dd8cd0fd4..3612b85b95 100644 --- a/MediaBrowser.Server.Implementations/ServerManager/WebSocketConnection.cs +++ b/MediaBrowser.Server.Implementations/ServerManager/WebSocketConnection.cs @@ -85,7 +85,8 @@ namespace MediaBrowser.Server.Implementations.ServerManager _jsonSerializer = jsonSerializer; _socket = socket; - _socket.OnReceiveDelegate = OnReceiveInternal; + _socket.OnReceiveBytes = OnReceiveInternal; + _socket.OnReceive = OnReceiveInternal; RemoteEndPoint = remoteEndPoint; _logger = logger; } @@ -127,6 +128,34 @@ namespace MediaBrowser.Server.Implementations.ServerManager } } + private void OnReceiveInternal(string message) + { + LastActivityDate = DateTime.UtcNow; + + if (OnReceive == null) + { + return; + } + try + { + var stub = (WebSocketMessage)_jsonSerializer.DeserializeFromString(message, typeof(WebSocketMessage)); + + var info = new WebSocketMessageInfo + { + MessageType = stub.MessageType, + Data = stub.Data == null ? null : stub.Data.ToString() + }; + + info.Connection = this; + + OnReceive(info); + } + catch (Exception ex) + { + _logger.ErrorException("Error processing web socket message", ex); + } + } + /// /// Sends a message asynchronously. /// diff --git a/MediaBrowser.Server.Implementations/WebSocket/AlchemyServer.cs b/MediaBrowser.Server.Implementations/WebSocket/AlchemyServer.cs index ba34bd22e2..797c4a80c4 100644 --- a/MediaBrowser.Server.Implementations/WebSocket/AlchemyServer.cs +++ b/MediaBrowser.Server.Implementations/WebSocket/AlchemyServer.cs @@ -90,6 +90,10 @@ namespace MediaBrowser.Server.Implementations.WebSocket /// public void Stop() { + if (WebSocketServer != null) + { + WebSocketServer.Stop(); + } } /// @@ -107,7 +111,10 @@ namespace MediaBrowser.Server.Implementations.WebSocket /// true to release both managed and unmanaged resources; false to release only unmanaged resources. protected virtual void Dispose(bool dispose) { - + if (WebSocketServer != null) + { + WebSocketServer.Dispose(); + } } } } diff --git a/MediaBrowser.Server.Implementations/WebSocket/AlchemyWebSocket.cs b/MediaBrowser.Server.Implementations/WebSocket/AlchemyWebSocket.cs index 0b6b145665..9582016258 100644 --- a/MediaBrowser.Server.Implementations/WebSocket/AlchemyWebSocket.cs +++ b/MediaBrowser.Server.Implementations/WebSocket/AlchemyWebSocket.cs @@ -3,7 +3,6 @@ using MediaBrowser.Common.Net; using MediaBrowser.Model.Logging; using MediaBrowser.Model.Net; using System; -using System.Text; using System.Threading; using System.Threading.Tasks; @@ -42,7 +41,7 @@ namespace MediaBrowser.Server.Implementations.WebSocket UserContext = context; context.SetOnDisconnect(OnDisconnected); - context.SetOnReceive(OnReceive); + context.SetOnReceive(OnReceiveContext); _logger.Info("Client connected from {0}", context.ClientAddress); } @@ -50,7 +49,7 @@ namespace MediaBrowser.Server.Implementations.WebSocket /// /// The _disconnected /// - private bool _disconnected = false; + private bool _disconnected; /// /// Gets or sets the state. /// @@ -73,25 +72,13 @@ namespace MediaBrowser.Server.Implementations.WebSocket /// Called when [receive]. /// /// The context. - private void OnReceive(UserContext context) + private void OnReceiveContext(UserContext context) { - if (OnReceiveDelegate != null) + if (OnReceive != null) { var json = context.DataFrame.ToString(); - if (!string.IsNullOrWhiteSpace(json)) - { - try - { - var bytes = Encoding.UTF8.GetBytes(json); - - OnReceiveDelegate(bytes); - } - catch (Exception ex) - { - _logger.ErrorException("Error processing web socket message", ex); - } - } + OnReceive(json); } } @@ -128,6 +115,12 @@ namespace MediaBrowser.Server.Implementations.WebSocket /// Gets or sets the receive action. /// /// The receive action. - public Action OnReceiveDelegate { get; set; } + public Action OnReceiveBytes { get; set; } + + /// + /// Gets or sets the on receive. + /// + /// The on receive. + public Action OnReceive { get; set; } } } diff --git a/MediaBrowser.Server.Implementations/WebSocket/FleckServer.cs b/MediaBrowser.Server.Implementations/WebSocket/FleckServer.cs new file mode 100644 index 0000000000..2c47a366e0 --- /dev/null +++ b/MediaBrowser.Server.Implementations/WebSocket/FleckServer.cs @@ -0,0 +1,54 @@ +using Fleck; +using MediaBrowser.Common.Net; +using System; +using IWebSocketServer = MediaBrowser.Common.Net.IWebSocketServer; + +namespace MediaBrowser.Server.Implementations.WebSocket +{ + public class FleckServer : IWebSocketServer + { + private WebSocketServer _server; + + public void Start(int portNumber) + { + var server = new WebSocketServer("ws://localhost:" + portNumber); + + server.Start(socket => + { + socket.OnOpen = () => OnClientConnected(socket); + }); + + _server = server; + } + + public void Stop() + { + _server.Dispose(); + } + + private void OnClientConnected(Fleck.IWebSocketConnection context) + { + if (WebSocketConnected != null) + { + var socket = new FleckWebSocket(context); + + WebSocketConnected(this, new WebSocketConnectEventArgs + { + WebSocket = socket, + Endpoint = context.ConnectionInfo.ClientIpAddress + ":" + context.ConnectionInfo.ClientPort + }); + } + } + public event EventHandler WebSocketConnected; + + public int Port + { + get { return _server.Port; } + } + + public void Dispose() + { + _server.Dispose(); + } + } +} diff --git a/MediaBrowser.Server.Implementations/WebSocket/FleckWebSocket.cs b/MediaBrowser.Server.Implementations/WebSocket/FleckWebSocket.cs new file mode 100644 index 0000000000..3667fab070 --- /dev/null +++ b/MediaBrowser.Server.Implementations/WebSocket/FleckWebSocket.cs @@ -0,0 +1,47 @@ +using MediaBrowser.Common.Net; +using MediaBrowser.Model.Net; +using System; +using System.Threading; +using System.Threading.Tasks; +using IWebSocketConnection = Fleck.IWebSocketConnection; + +namespace MediaBrowser.Server.Implementations.WebSocket +{ + public class FleckWebSocket : IWebSocket + { + private readonly IWebSocketConnection _connection; + + public FleckWebSocket(IWebSocketConnection connection) + { + _connection = connection; + + _connection.OnMessage = OnReceiveData; + } + + public WebSocketState State + { + get { return _connection.IsAvailable ? WebSocketState.Open : WebSocketState.Closed; } + } + + private void OnReceiveData(string data) + { + if (OnReceive != null) + { + OnReceive(data); + } + } + + public Task SendAsync(byte[] bytes, WebSocketMessageType type, bool endOfMessage, CancellationToken cancellationToken) + { + return Task.Run(() => _connection.Send(bytes)); + } + + public void Dispose() + { + _connection.Close(); + } + + public Action OnReceiveBytes { get; set; } + public Action OnReceive { get; set; } + } +}