using System; using System.Buffers; using System.IO.Pipelines; using System.Net; using System.Net.WebSockets; using System.Text; using System.Text.Json; using System.Threading; using System.Threading.Tasks; using Jellyfin.Extensions.Json; using MediaBrowser.Controller.Net; using MediaBrowser.Model.Net; using MediaBrowser.Model.Session; using Microsoft.Extensions.Logging; namespace Emby.Server.Implementations.HttpServer { /// /// Class WebSocketConnection. /// public class WebSocketConnection : IWebSocketConnection { /// /// The logger. /// private readonly ILogger _logger; /// /// The json serializer options. /// private readonly JsonSerializerOptions _jsonOptions; /// /// The socket. /// private readonly WebSocket _socket; private bool _disposed = false; /// /// Initializes a new instance of the class. /// /// The logger. /// The socket. /// The remote end point. public WebSocketConnection( ILogger logger, WebSocket socket, IPAddress? remoteEndPoint) { _logger = logger; _socket = socket; RemoteEndPoint = remoteEndPoint; _jsonOptions = JsonDefaults.Options; LastActivityDate = DateTime.Now; } /// public event EventHandler? Closed; /// /// Gets the remote end point. /// public IPAddress? RemoteEndPoint { get; } /// /// Gets or sets the receive action. /// /// The receive action. public Func? OnReceive { get; set; } /// /// Gets the last activity date. /// /// The last activity date. public DateTime LastActivityDate { get; private set; } /// public DateTime LastKeepAliveDate { get; set; } /// /// Gets the state. /// /// The state. public WebSocketState State => _socket.State; /// /// Sends a message asynchronously. /// /// The type of the message. /// The message. /// The cancellation token. /// Task. public Task SendAsync(WebSocketMessage message, CancellationToken cancellationToken) { var json = JsonSerializer.SerializeToUtf8Bytes(message, _jsonOptions); return _socket.SendAsync(json, WebSocketMessageType.Text, true, cancellationToken); } /// public async Task ProcessAsync(CancellationToken cancellationToken = default) { var pipe = new Pipe(); var writer = pipe.Writer; ValueWebSocketReceiveResult receiveresult; do { // Allocate at least 512 bytes from the PipeWriter Memory memory = writer.GetMemory(512); try { receiveresult = await _socket.ReceiveAsync(memory, cancellationToken).ConfigureAwait(false); } catch (WebSocketException ex) { _logger.LogWarning("WS {IP} error receiving data: {Message}", RemoteEndPoint, ex.Message); break; } int bytesRead = receiveresult.Count; if (bytesRead == 0) { break; } // Tell the PipeWriter how much was read from the Socket writer.Advance(bytesRead); // Make the data available to the PipeReader FlushResult flushResult = await writer.FlushAsync(cancellationToken).ConfigureAwait(false); if (flushResult.IsCompleted) { // The PipeReader stopped reading break; } LastActivityDate = DateTime.UtcNow; if (receiveresult.EndOfMessage) { await ProcessInternal(pipe.Reader).ConfigureAwait(false); } } while ((_socket.State == WebSocketState.Open || _socket.State == WebSocketState.Connecting) && receiveresult.MessageType != WebSocketMessageType.Close); Closed?.Invoke(this, EventArgs.Empty); if (_socket.State == WebSocketState.Open || _socket.State == WebSocketState.CloseReceived || _socket.State == WebSocketState.CloseSent) { await _socket.CloseAsync( WebSocketCloseStatus.NormalClosure, string.Empty, cancellationToken).ConfigureAwait(false); } } private async Task ProcessInternal(PipeReader reader) { ReadResult result = await reader.ReadAsync().ConfigureAwait(false); ReadOnlySequence buffer = result.Buffer; if (OnReceive is null) { // Tell the PipeReader how much of the buffer we have consumed reader.AdvanceTo(buffer.End); return; } WebSocketMessage? stub; long bytesConsumed; try { stub = DeserializeWebSocketMessage(buffer, out bytesConsumed); } catch (JsonException ex) { // Tell the PipeReader how much of the buffer we have consumed reader.AdvanceTo(buffer.End); _logger.LogError(ex, "Error processing web socket message: {Data}", Encoding.UTF8.GetString(buffer)); return; } if (stub is null) { _logger.LogError("Error processing web socket message"); return; } // Tell the PipeReader how much of the buffer we have consumed reader.AdvanceTo(buffer.GetPosition(bytesConsumed)); _logger.LogDebug("WS {IP} received message: {@Message}", RemoteEndPoint, stub); if (stub.MessageType == SessionMessageType.KeepAlive) { await SendKeepAliveResponse().ConfigureAwait(false); } else { await OnReceive( new WebSocketMessageInfo { MessageType = stub.MessageType, Data = stub.Data?.ToString(), // Data can be null Connection = this }).ConfigureAwait(false); } } internal WebSocketMessage? DeserializeWebSocketMessage(ReadOnlySequence bytes, out long bytesConsumed) { var jsonReader = new Utf8JsonReader(bytes); var ret = JsonSerializer.Deserialize>(ref jsonReader, _jsonOptions); bytesConsumed = jsonReader.BytesConsumed; return ret; } private Task SendKeepAliveResponse() { LastKeepAliveDate = DateTime.UtcNow; return SendAsync( new WebSocketMessage { MessageId = Guid.NewGuid(), MessageType = SessionMessageType.KeepAlive }, CancellationToken.None); } /// public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } /// /// Releases unmanaged and - optionally - managed resources. /// /// true to release both managed and unmanaged resources; false to release only unmanaged resources. protected virtual void Dispose(bool dispose) { if (_disposed) { return; } if (dispose) { _socket.Dispose(); } _disposed = true; } /// public async ValueTask DisposeAsync() { await DisposeAsyncCore().ConfigureAwait(false); Dispose(false); GC.SuppressFinalize(this); } /// /// Used to perform asynchronous cleanup of managed resources or for cascading calls to . /// /// A ValueTask. protected virtual async ValueTask DisposeAsyncCore() { if (_socket.State == WebSocketState.Open) { await _socket.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "System Shutdown", CancellationToken.None).ConfigureAwait(false); } _socket.Dispose(); } } }