Changes SessionWebSocketListener to (re)use a timer (#11358)

pull/11378/head
Bond-009 2 weeks ago committed by GitHub
parent 82e5f99f83
commit 356e05e3af
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -101,14 +101,14 @@ namespace Emby.Server.Implementations.HttpServer
var pipe = new Pipe();
var writer = pipe.Writer;
ValueWebSocketReceiveResult receiveresult;
ValueWebSocketReceiveResult receiveResult;
do
{
// Allocate at least 512 bytes from the PipeWriter
Memory<byte> memory = writer.GetMemory(512);
try
{
receiveresult = await _socket.ReceiveAsync(memory, cancellationToken).ConfigureAwait(false);
receiveResult = await _socket.ReceiveAsync(memory, cancellationToken).ConfigureAwait(false);
}
catch (WebSocketException ex)
{
@ -116,7 +116,7 @@ namespace Emby.Server.Implementations.HttpServer
break;
}
int bytesRead = receiveresult.Count;
int bytesRead = receiveResult.Count;
if (bytesRead == 0)
{
break;
@ -135,13 +135,13 @@ namespace Emby.Server.Implementations.HttpServer
LastActivityDate = DateTime.UtcNow;
if (receiveresult.EndOfMessage)
if (receiveResult.EndOfMessage)
{
await ProcessInternal(pipe.Reader).ConfigureAwait(false);
}
}
while ((_socket.State == WebSocketState.Open || _socket.State == WebSocketState.Connecting)
&& receiveresult.MessageType != WebSocketMessageType.Close);
&& receiveResult.MessageType != WebSocketMessageType.Close);
Closed?.Invoke(this, EventArgs.Empty);

@ -33,11 +33,6 @@ namespace Emby.Server.Implementations.Session
/// </summary>
private const float ForceKeepAliveFactor = 0.75f;
/// <summary>
/// Lock used for accessing the KeepAlive cancellation token.
/// </summary>
private readonly object _keepAliveLock = new object();
/// <summary>
/// The WebSocket watchlist.
/// </summary>
@ -55,7 +50,7 @@ namespace Emby.Server.Implementations.Session
/// <summary>
/// The KeepAlive cancellation token.
/// </summary>
private CancellationTokenSource? _keepAliveCancellationToken;
private System.Timers.Timer _keepAlive;
/// <summary>
/// Initializes a new instance of the <see cref="SessionWebSocketListener" /> class.
@ -71,12 +66,34 @@ namespace Emby.Server.Implementations.Session
_logger = logger;
_sessionManager = sessionManager;
_loggerFactory = loggerFactory;
_keepAlive = new System.Timers.Timer(TimeSpan.FromSeconds(WebSocketLostTimeout * IntervalFactor))
{
AutoReset = true,
Enabled = false
};
_keepAlive.Elapsed += KeepAliveSockets;
}
/// <inheritdoc />
public void Dispose()
{
StopKeepAlive();
if (_keepAlive is not null)
{
_keepAlive.Stop();
_keepAlive.Elapsed -= KeepAliveSockets;
_keepAlive.Dispose();
_keepAlive = null!;
}
lock (_webSocketsLock)
{
foreach (var webSocket in _webSockets)
{
webSocket.Closed -= OnWebSocketClosed;
}
_webSockets.Clear();
}
}
/// <summary>
@ -164,7 +181,7 @@ namespace Emby.Server.Implementations.Session
webSocket.Closed += OnWebSocketClosed;
webSocket.LastKeepAliveDate = DateTime.UtcNow;
StartKeepAlive();
_keepAlive.Start();
}
// Notify WebSocket about timeout
@ -186,66 +203,26 @@ namespace Emby.Server.Implementations.Session
{
lock (_webSocketsLock)
{
if (!_webSockets.Remove(webSocket))
{
_logger.LogWarning("WebSocket {0} not on watchlist.", webSocket);
}
else
if (_webSockets.Remove(webSocket))
{
webSocket.Closed -= OnWebSocketClosed;
}
}
}
/// <summary>
/// Starts the KeepAlive watcher.
/// </summary>
private void StartKeepAlive()
{
lock (_keepAliveLock)
{
if (_keepAliveCancellationToken is null)
{
_keepAliveCancellationToken = new CancellationTokenSource();
// Start KeepAlive watcher
_ = RepeatAsyncCallbackEvery(
KeepAliveSockets,
TimeSpan.FromSeconds(WebSocketLostTimeout * IntervalFactor),
_keepAliveCancellationToken.Token);
}
}
}
/// <summary>
/// Stops the KeepAlive watcher.
/// </summary>
private void StopKeepAlive()
{
lock (_keepAliveLock)
{
if (_keepAliveCancellationToken is not null)
else
{
_keepAliveCancellationToken.Cancel();
_keepAliveCancellationToken.Dispose();
_keepAliveCancellationToken = null;
_logger.LogWarning("WebSocket {0} not on watchlist.", webSocket);
}
}
lock (_webSocketsLock)
{
foreach (var webSocket in _webSockets)
if (_webSockets.Count == 0)
{
webSocket.Closed -= OnWebSocketClosed;
_keepAlive.Stop();
}
_webSockets.Clear();
}
}
/// <summary>
/// Checks status of KeepAlive of WebSockets.
/// </summary>
private async Task KeepAliveSockets()
private async void KeepAliveSockets(object? o, EventArgs? e)
{
List<IWebSocketConnection> inactive;
List<IWebSocketConnection> lost;
@ -291,11 +268,6 @@ namespace Emby.Server.Implementations.Session
RemoveWebSocket(webSocket);
}
}
if (_webSockets.Count == 0)
{
StopKeepAlive();
}
}
}
@ -310,29 +282,5 @@ namespace Emby.Server.Implementations.Session
new ForceKeepAliveMessage(WebSocketLostTimeout),
CancellationToken.None);
}
/// <summary>
/// Runs a given async callback once every specified interval time, until cancelled.
/// </summary>
/// <param name="callback">The async callback.</param>
/// <param name="interval">The interval time.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>Task.</returns>
private async Task RepeatAsyncCallbackEvery(Func<Task> callback, TimeSpan interval, CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
await callback().ConfigureAwait(false);
try
{
await Task.Delay(interval, cancellationToken).ConfigureAwait(false);
}
catch (TaskCanceledException)
{
return;
}
}
}
}
}

Loading…
Cancel
Save