using System;
using System.Buffers;
using System.IO.Pipelines;
using System.Net;
using System.Net.WebSockets;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Jellyfin.Extensions.Json;
using MediaBrowser.Controller.Net;
using MediaBrowser.Model.Net;
using MediaBrowser.Model.Session;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
namespace Emby.Server.Implementations.HttpServer
{
///
/// Class WebSocketConnection.
///
public class WebSocketConnection : IWebSocketConnection, IDisposable
{
///
/// The logger.
///
private readonly ILogger _logger;
///
/// The json serializer options.
///
private readonly JsonSerializerOptions _jsonOptions;
///
/// The socket.
///
private readonly WebSocket _socket;
///
/// Initializes a new instance of the class.
///
/// The logger.
/// The socket.
/// The remote end point.
/// The query.
public WebSocketConnection(
ILogger logger,
WebSocket socket,
IPAddress? remoteEndPoint,
IQueryCollection query)
{
_logger = logger;
_socket = socket;
RemoteEndPoint = remoteEndPoint;
QueryString = query;
_jsonOptions = JsonDefaults.Options;
LastActivityDate = DateTime.Now;
}
///
public event EventHandler? Closed;
///
/// Gets the remote end point.
///
public IPAddress? RemoteEndPoint { get; }
///
/// Gets or sets the receive action.
///
/// The receive action.
public Func? OnReceive { get; set; }
///
/// Gets the last activity date.
///
/// The last activity date.
public DateTime LastActivityDate { get; private set; }
///
public DateTime LastKeepAliveDate { get; set; }
///
/// Gets the query string.
///
/// The query string.
public IQueryCollection QueryString { get; }
///
/// Gets the state.
///
/// The state.
public WebSocketState State => _socket.State;
///
/// Sends a message asynchronously.
///
/// The type of the message.
/// The message.
/// The cancellation token.
/// Task.
public Task SendAsync(WebSocketMessage message, CancellationToken cancellationToken)
{
var json = JsonSerializer.SerializeToUtf8Bytes(message, _jsonOptions);
return _socket.SendAsync(json, WebSocketMessageType.Text, true, cancellationToken);
}
///
public async Task ProcessAsync(CancellationToken cancellationToken = default)
{
var pipe = new Pipe();
var writer = pipe.Writer;
ValueWebSocketReceiveResult receiveresult;
do
{
// Allocate at least 512 bytes from the PipeWriter
Memory memory = writer.GetMemory(512);
try
{
receiveresult = await _socket.ReceiveAsync(memory, cancellationToken).ConfigureAwait(false);
}
catch (WebSocketException ex)
{
_logger.LogWarning("WS {IP} error receiving data: {Message}", RemoteEndPoint, ex.Message);
break;
}
int bytesRead = receiveresult.Count;
if (bytesRead == 0)
{
break;
}
// Tell the PipeWriter how much was read from the Socket
writer.Advance(bytesRead);
// Make the data available to the PipeReader
FlushResult flushResult = await writer.FlushAsync(cancellationToken).ConfigureAwait(false);
if (flushResult.IsCompleted)
{
// The PipeReader stopped reading
break;
}
LastActivityDate = DateTime.UtcNow;
if (receiveresult.EndOfMessage)
{
await ProcessInternal(pipe.Reader).ConfigureAwait(false);
}
}
while ((_socket.State == WebSocketState.Open || _socket.State == WebSocketState.Connecting)
&& receiveresult.MessageType != WebSocketMessageType.Close);
Closed?.Invoke(this, EventArgs.Empty);
if (_socket.State == WebSocketState.Open
|| _socket.State == WebSocketState.CloseReceived
|| _socket.State == WebSocketState.CloseSent)
{
await _socket.CloseAsync(
WebSocketCloseStatus.NormalClosure,
string.Empty,
cancellationToken).ConfigureAwait(false);
}
}
private async Task ProcessInternal(PipeReader reader)
{
ReadResult result = await reader.ReadAsync().ConfigureAwait(false);
ReadOnlySequence buffer = result.Buffer;
if (OnReceive == null)
{
// Tell the PipeReader how much of the buffer we have consumed
reader.AdvanceTo(buffer.End);
return;
}
WebSocketMessage