#nullable disable #pragma warning disable CS1591, SA1306, SA1401 using System; using System.Collections.Generic; using System.Globalization; using System.Linq; using System.Net.WebSockets; using System.Threading; using System.Threading.Tasks; using MediaBrowser.Controller.Net.WebSocketMessages; using MediaBrowser.Model.Session; using Microsoft.AspNetCore.Http; using Microsoft.Extensions.Logging; namespace MediaBrowser.Controller.Net { /// /// Starts sending data over a web socket periodically when a message is received, and then stops when a corresponding stop message is received. /// /// The type of the T return data type. /// The type of the T state type. public abstract class BasePeriodicWebSocketListener : IWebSocketListener, IDisposable where TStateType : WebSocketListenerState, new() where TReturnDataType : class { /// /// The _active connections. /// private readonly List> _activeConnections = new List>(); /// /// The logger. /// protected readonly ILogger> Logger; protected BasePeriodicWebSocketListener(ILogger> logger) { ArgumentNullException.ThrowIfNull(logger); Logger = logger; } /// /// Gets the type used for the messages sent to the client. /// /// The type. protected abstract SessionMessageType Type { get; } /// /// Gets the message type received from the client to start sending messages. /// /// The type. protected abstract SessionMessageType StartType { get; } /// /// Gets the message type received from the client to stop sending messages. /// /// The type. protected abstract SessionMessageType StopType { get; } /// /// Gets the data to send. /// /// Task{`1}. protected abstract Task GetDataToSend(); /// /// Processes the message. /// /// The message. /// Task. public Task ProcessMessageAsync(WebSocketMessageInfo message) { ArgumentNullException.ThrowIfNull(message); if (message.MessageType == StartType) { Start(message); } if (message.MessageType == StopType) { Stop(message); } return Task.CompletedTask; } /// public Task ProcessWebSocketConnectedAsync(IWebSocketConnection connection, HttpContext httpContext) => Task.CompletedTask; /// /// Starts sending messages over a web socket. /// /// The message. protected virtual void Start(WebSocketMessageInfo message) { var vals = message.Data.Split(','); var dueTimeMs = long.Parse(vals[0], CultureInfo.InvariantCulture); var periodMs = long.Parse(vals[1], CultureInfo.InvariantCulture); var cancellationTokenSource = new CancellationTokenSource(); Logger.LogDebug("WS {1} begin transmitting to {0}", message.Connection.RemoteEndPoint, GetType().Name); var state = new TStateType { IntervalMs = periodMs, InitialDelayMs = dueTimeMs }; lock (_activeConnections) { _activeConnections.Add(new Tuple(message.Connection, cancellationTokenSource, state)); } } protected async Task SendData(bool force) { Tuple[] tuples; lock (_activeConnections) { tuples = _activeConnections .Where(c => { if (c.Item1.State == WebSocketState.Open && !c.Item2.IsCancellationRequested) { var state = c.Item3; if (force || (DateTime.UtcNow - state.DateLastSendUtc).TotalMilliseconds >= state.IntervalMs) { return true; } } return false; }) .ToArray(); } IEnumerable GetTasks() { foreach (var tuple in tuples) { yield return SendData(tuple); } } await Task.WhenAll(GetTasks()).ConfigureAwait(false); } private async Task SendData(Tuple tuple) { var connection = tuple.Item1; try { var state = tuple.Item3; var cancellationToken = tuple.Item2.Token; var data = await GetDataToSend().ConfigureAwait(false); if (data is not null) { await connection.SendAsync( new OutboundWebSocketMessage { MessageType = Type, Data = data }, cancellationToken).ConfigureAwait(false); state.DateLastSendUtc = DateTime.UtcNow; } } catch (OperationCanceledException) { if (tuple.Item2.IsCancellationRequested) { DisposeConnection(tuple); } } catch (Exception ex) { Logger.LogError(ex, "Error sending web socket message {Name}", Type); DisposeConnection(tuple); } } /// /// Stops sending messages over a web socket. /// /// The message. private void Stop(WebSocketMessageInfo message) { lock (_activeConnections) { var connection = _activeConnections.FirstOrDefault(c => c.Item1 == message.Connection); if (connection is not null) { DisposeConnection(connection); } } } /// /// Disposes the connection. /// /// The connection. private void DisposeConnection(Tuple connection) { Logger.LogDebug("WS {1} stop transmitting to {0}", connection.Item1.RemoteEndPoint, GetType().Name); // TODO disposing the connection seems to break websockets in subtle ways, so what is the purpose of this function really... // connection.Item1.Dispose(); try { connection.Item2.Cancel(); connection.Item2.Dispose(); } catch (ObjectDisposedException ex) { // TODO Investigate and properly fix. Logger.LogError(ex, "Object Disposed"); } catch (Exception ex) { // TODO Investigate and properly fix. Logger.LogError(ex, "Error disposing websocket"); } lock (_activeConnections) { _activeConnections.Remove(connection); } } /// /// Releases unmanaged and - optionally - managed resources. /// /// true to release both managed and unmanaged resources; false to release only unmanaged resources. protected virtual void Dispose(bool dispose) { if (dispose) { lock (_activeConnections) { foreach (var connection in _activeConnections.ToArray()) { DisposeConnection(connection); } } } } /// /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. /// public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } } }