From eae031ae5acf948fbc303eb589b469953179f2ed Mon Sep 17 00:00:00 2001 From: Claus Vium Date: Mon, 18 Mar 2024 20:55:18 +0100 Subject: [PATCH] refactor: use Channels as queueing mechanism for periodic websocket messages (#11092) --- .../Session/SessionManager.cs | 7 +- .../ActivityLogWebSocketListener.cs | 13 +- .../ScheduledTasksWebSocketListener.cs | 21 +- .../SessionInfoWebSocketListener.cs | 36 ++-- .../Net/BasePeriodicWebSocketListener.cs | 199 +++++++++++------- 5 files changed, 169 insertions(+), 107 deletions(-) diff --git a/Emby.Server.Implementations/Session/SessionManager.cs b/Emby.Server.Implementations/Session/SessionManager.cs index 40b3b0339e..75945b08a2 100644 --- a/Emby.Server.Implementations/Session/SessionManager.cs +++ b/Emby.Server.Implementations/Session/SessionManager.cs @@ -456,8 +456,8 @@ namespace Emby.Server.Implementations.Session if (!_activeConnections.TryGetValue(key, out var sessionInfo)) { - _activeConnections[key] = await CreateSession(key, appName, appVersion, deviceId, deviceName, remoteEndPoint, user).ConfigureAwait(false); - sessionInfo = _activeConnections[key]; + sessionInfo = await CreateSession(key, appName, appVersion, deviceId, deviceName, remoteEndPoint, user).ConfigureAwait(false); + _activeConnections[key] = sessionInfo; } sessionInfo.UserId = user?.Id ?? Guid.Empty; @@ -614,9 +614,6 @@ namespace Emby.Server.Implementations.Session _logger.LogDebug(ex, "Error calling OnPlaybackStopped"); } } - - playingSessions = Sessions.Where(i => i.NowPlayingItem is not null) - .ToList(); } else { diff --git a/Jellyfin.Api/WebSocketListeners/ActivityLogWebSocketListener.cs b/Jellyfin.Api/WebSocketListeners/ActivityLogWebSocketListener.cs index ba228cb002..99516e9384 100644 --- a/Jellyfin.Api/WebSocketListeners/ActivityLogWebSocketListener.cs +++ b/Jellyfin.Api/WebSocketListeners/ActivityLogWebSocketListener.cs @@ -20,6 +20,8 @@ public class ActivityLogWebSocketListener : BasePeriodicWebSocketListener private readonly IActivityManager _activityManager; + private bool _disposed; + /// /// Initializes a new instance of the class. /// @@ -51,14 +53,15 @@ public class ActivityLogWebSocketListener : BasePeriodicWebSocketListener - protected override void Dispose(bool dispose) + protected override async ValueTask DisposeAsyncCore() { - if (dispose) + if (!_disposed) { _activityManager.EntryCreated -= OnEntryCreated; + _disposed = true; } - base.Dispose(dispose); + await base.DisposeAsyncCore().ConfigureAwait(false); } /// @@ -75,8 +78,8 @@ public class ActivityLogWebSocketListener : BasePeriodicWebSocketListener e) + private void OnEntryCreated(object? sender, GenericEventArgs e) { - await SendData(true).ConfigureAwait(false); + SendData(true); } } diff --git a/Jellyfin.Api/WebSocketListeners/ScheduledTasksWebSocketListener.cs b/Jellyfin.Api/WebSocketListeners/ScheduledTasksWebSocketListener.cs index 37c108d5a6..dd9286210a 100644 --- a/Jellyfin.Api/WebSocketListeners/ScheduledTasksWebSocketListener.cs +++ b/Jellyfin.Api/WebSocketListeners/ScheduledTasksWebSocketListener.cs @@ -20,6 +20,8 @@ public class ScheduledTasksWebSocketListener : BasePeriodicWebSocketListenerThe task manager. private readonly ITaskManager _taskManager; + private bool _disposed; + /// /// Initializes a new instance of the class. /// @@ -56,31 +58,32 @@ public class ScheduledTasksWebSocketListener : BasePeriodicWebSocketListener - protected override void Dispose(bool dispose) + protected override async ValueTask DisposeAsyncCore() { - if (dispose) + if (!_disposed) { _taskManager.TaskExecuting -= OnTaskExecuting; _taskManager.TaskCompleted -= OnTaskCompleted; + _disposed = true; } - base.Dispose(dispose); + await base.DisposeAsyncCore().ConfigureAwait(false); } - private async void OnTaskCompleted(object? sender, TaskCompletionEventArgs e) + private void OnTaskCompleted(object? sender, TaskCompletionEventArgs e) { e.Task.TaskProgress -= OnTaskProgress; - await SendData(true).ConfigureAwait(false); + SendData(true); } - private async void OnTaskExecuting(object? sender, GenericEventArgs e) + private void OnTaskExecuting(object? sender, GenericEventArgs e) { - await SendData(true).ConfigureAwait(false); + SendData(true); e.Argument.TaskProgress += OnTaskProgress; } - private async void OnTaskProgress(object? sender, GenericEventArgs e) + private void OnTaskProgress(object? sender, GenericEventArgs e) { - await SendData(false).ConfigureAwait(false); + SendData(false); } } diff --git a/Jellyfin.Api/WebSocketListeners/SessionInfoWebSocketListener.cs b/Jellyfin.Api/WebSocketListeners/SessionInfoWebSocketListener.cs index 3c2b86142e..a6cfe4d56c 100644 --- a/Jellyfin.Api/WebSocketListeners/SessionInfoWebSocketListener.cs +++ b/Jellyfin.Api/WebSocketListeners/SessionInfoWebSocketListener.cs @@ -16,6 +16,7 @@ namespace Jellyfin.Api.WebSocketListeners; public class SessionInfoWebSocketListener : BasePeriodicWebSocketListener, WebSocketListenerState> { private readonly ISessionManager _sessionManager; + private bool _disposed; /// /// Initializes a new instance of the class. @@ -55,9 +56,9 @@ public class SessionInfoWebSocketListener : BasePeriodicWebSocketListener - protected override void Dispose(bool dispose) + protected override async ValueTask DisposeAsyncCore() { - if (dispose) + if (!_disposed) { _sessionManager.SessionStarted -= OnSessionManagerSessionStarted; _sessionManager.SessionEnded -= OnSessionManagerSessionEnded; @@ -66,9 +67,10 @@ public class SessionInfoWebSocketListener : BasePeriodicWebSocketListener @@ -85,38 +87,38 @@ public class SessionInfoWebSocketListener : BasePeriodicWebSocketListener /// The type of the T return data type. /// The type of the T state type. - public abstract class BasePeriodicWebSocketListener : IWebSocketListener, IDisposable + public abstract class BasePeriodicWebSocketListener : IWebSocketListener, IAsyncDisposable where TStateType : WebSocketListenerState, new() where TReturnDataType : class { + private readonly Channel _channel = Channel.CreateUnbounded(new UnboundedChannelOptions + { + AllowSynchronousContinuations = false, + SingleReader = true, + SingleWriter = false + }); + + private readonly SemaphoreSlim _lock = new(1, 1); + /// /// The _active connections. /// - private readonly List> _activeConnections = - new List>(); + private readonly List<(IWebSocketConnection Connection, CancellationTokenSource CancellationTokenSource, TStateType State)> _activeConnections = new(); /// /// The logger. /// protected readonly ILogger> Logger; + private readonly Task _messageConsumerTask; + protected BasePeriodicWebSocketListener(ILogger> logger) { ArgumentNullException.ThrowIfNull(logger); Logger = logger; + + _messageConsumerTask = HandleMessages(); } /// @@ -113,75 +126,103 @@ namespace MediaBrowser.Controller.Net InitialDelayMs = dueTimeMs }; - lock (_activeConnections) + _lock.Wait(); + try + { + _activeConnections.Add((message.Connection, cancellationTokenSource, state)); + } + finally { - _activeConnections.Add(new Tuple(message.Connection, cancellationTokenSource, state)); + _lock.Release(); } } - protected async Task SendData(bool force) + protected void SendData(bool force) { - Tuple[] tuples; + _channel.Writer.TryWrite(force); + } - lock (_activeConnections) + private async Task HandleMessages() + { + while (await _channel.Reader.WaitToReadAsync().ConfigureAwait(false)) { - tuples = _activeConnections - .Where(c => + while (_channel.Reader.TryRead(out var force)) + { + try { - if (c.Item1.State == WebSocketState.Open && !c.Item2.IsCancellationRequested) - { - var state = c.Item3; + (IWebSocketConnection Connection, CancellationTokenSource CancellationTokenSource, TStateType State)[] tuples; - if (force || (DateTime.UtcNow - state.DateLastSendUtc).TotalMilliseconds >= state.IntervalMs) + var now = DateTime.UtcNow; + await _lock.WaitAsync().ConfigureAwait(false); + try + { + if (_activeConnections.Count == 0) { - return true; + continue; } + + tuples = _activeConnections + .Where(c => + { + if (c.Connection.State != WebSocketState.Open || c.CancellationTokenSource.IsCancellationRequested) + { + return false; + } + + var state = c.State; + return force || (now - state.DateLastSendUtc).TotalMilliseconds >= state.IntervalMs; + }) + .ToArray(); + } + finally + { + _lock.Release(); } - return false; - }) - .ToArray(); - } + if (tuples.Length == 0) + { + continue; + } - IEnumerable GetTasks() - { - foreach (var tuple in tuples) - { - yield return SendData(tuple); + var data = await GetDataToSend().ConfigureAwait(false); + if (data is null) + { + continue; + } + + IEnumerable GetTasks() + { + foreach (var tuple in tuples) + { + yield return SendDataInternal(data, tuple); + } + } + + await Task.WhenAll(GetTasks()).ConfigureAwait(false); + } + catch (Exception ex) + { + Logger.LogError(ex, "Failed to send updates to websockets"); + } } } - - await Task.WhenAll(GetTasks()).ConfigureAwait(false); } - private async Task SendData(Tuple tuple) + private async Task SendDataInternal(TReturnDataType data, (IWebSocketConnection Connection, CancellationTokenSource CancellationTokenSource, TStateType State) tuple) { - var connection = tuple.Item1; - try { - var state = tuple.Item3; - - var cancellationToken = tuple.Item2.Token; - - var data = await GetDataToSend().ConfigureAwait(false); + var (connection, cts, state) = tuple; + var cancellationToken = cts.Token; + await connection.SendAsync( + new OutboundWebSocketMessage { MessageType = Type, Data = data }, + cancellationToken).ConfigureAwait(false); - if (data is not null) - { - await connection.SendAsync( - new OutboundWebSocketMessage - { - MessageType = Type, - Data = data - }, - cancellationToken).ConfigureAwait(false); - - state.DateLastSendUtc = DateTime.UtcNow; - } + state.DateLastSendUtc = DateTime.UtcNow; } catch (OperationCanceledException) { - if (tuple.Item2.IsCancellationRequested) + if (tuple.CancellationTokenSource.IsCancellationRequested) { DisposeConnection(tuple); } @@ -199,32 +240,37 @@ namespace MediaBrowser.Controller.Net /// The message. private void Stop(WebSocketMessageInfo message) { - lock (_activeConnections) + _lock.Wait(); + try { - var connection = _activeConnections.FirstOrDefault(c => c.Item1 == message.Connection); + var connection = _activeConnections.FirstOrDefault(c => c.Connection == message.Connection); - if (connection is not null) + if (connection != default) { DisposeConnection(connection); } } + finally + { + _lock.Release(); + } } /// /// Disposes the connection. /// /// The connection. - private void DisposeConnection(Tuple connection) + private void DisposeConnection((IWebSocketConnection Connection, CancellationTokenSource CancellationTokenSource, TStateType State) connection) { - Logger.LogDebug("WS {1} stop transmitting to {0}", connection.Item1.RemoteEndPoint, GetType().Name); + Logger.LogDebug("WS {1} stop transmitting to {0}", connection.Connection.RemoteEndPoint, GetType().Name); // TODO disposing the connection seems to break websockets in subtle ways, so what is the purpose of this function really... // connection.Item1.Dispose(); try { - connection.Item2.Cancel(); - connection.Item2.Dispose(); + connection.CancellationTokenSource.Cancel(); + connection.CancellationTokenSource.Dispose(); } catch (ObjectDisposedException ex) { @@ -237,36 +283,47 @@ namespace MediaBrowser.Controller.Net Logger.LogError(ex, "Error disposing websocket"); } - lock (_activeConnections) + _lock.Wait(); + try { _activeConnections.Remove(connection); } + finally + { + _lock.Release(); + } } - /// - /// Releases unmanaged and - optionally - managed resources. - /// - /// true to release both managed and unmanaged resources; false to release only unmanaged resources. - protected virtual void Dispose(bool dispose) + protected virtual async ValueTask DisposeAsyncCore() { - if (dispose) + try + { + _channel.Writer.TryComplete(); + await _messageConsumerTask.ConfigureAwait(false); + } + catch (Exception ex) + { + Logger.LogError(ex, "Disposing the message consumer failed"); + } + + await _lock.WaitAsync().ConfigureAwait(false); + try { - lock (_activeConnections) + foreach (var connection in _activeConnections.ToArray()) { - foreach (var connection in _activeConnections.ToArray()) - { - DisposeConnection(connection); - } + DisposeConnection(connection); } } + finally + { + _lock.Release(); + } } - /// - /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. - /// - public void Dispose() + /// + public async ValueTask DisposeAsync() { - Dispose(true); + await DisposeAsyncCore().ConfigureAwait(false); GC.SuppressFinalize(this); } }