@ -1,112 +1,44 @@
using System ;
using System.Collections.Generic ;
using System.Linq ;
using System.Net ;
using System.Net.Sockets ;
using System.Text ;
using System.Text.Json ;
using System.Threading ;
using System.Threading.Tasks ;
using MediaBrowser.Controller ;
using MediaBrowser.Model.ApiClient ;
using MediaBrowser.Model.Events ;
using MediaBrowser.Model.Net ;
using MediaBrowser.Model.Serialization ;
using Microsoft.Extensions.Logging ;
namespace Emby.Server.Implementations.Udp
{
/// <summary>
/// Provides a Udp Server
/// Provides a Udp Server .
/// </summary>
public class UdpServer : IDisposable
public sealed class UdpServer : IDisposable
{
/// <summary>
/// The _logger
/// </summary>
private readonly ILogger _logger ;
private readonly IServerApplicationHost _appHost ;
private bool _isDisposed ;
private readonly List < Tuple < string , bool , Func < string , IPEndPoint , Encoding , CancellationToken , Task > > > _responders = new List < Tuple < string , bool , Func < string , IPEndPoint , Encoding , CancellationToken , Task > > > ( ) ;
private Socket _udpSocket ;
private IPEndPoint _endpoint ;
private readonly byte [ ] _receiveBuffer = new byte [ 8192 ] ;
private readonly IServerApplicationHost _appHost ;
private readonly IJsonSerializer _json ;
private bool _disposed = false ;
/// <summary>
/// Initializes a new instance of the <see cref="UdpServer" /> class.
/// </summary>
public UdpServer ( ILogger logger , IServerApplicationHost appHost , IJsonSerializer json , ISocketFactory socketFactory )
public UdpServer ( ILogger logger , IServerApplicationHost appHost )
{
_logger = logger ;
_appHost = appHost ;
_json = json ;
_socketFactory = socketFactory ;
AddMessageResponder ( "who is JellyfinServer?" , true , RespondToV2Message ) ;
}
private void AddMessageResponder ( string message , bool isSubstring , Func < string , IPEndPoint , Encoding , CancellationToken , Task > responder )
{
_responders . Add ( new Tuple < string , bool , Func < string , IPEndPoint , Encoding , CancellationToken , Task > > ( message , isSubstring , responder ) ) ;
}
/// <summary>
/// Raises the <see cref="E:MessageReceived" /> event.
/// </summary>
private async void OnMessageReceived ( GenericEventArgs < SocketReceiveResult > e )
{
var message = e . Argument ;
var encoding = Encoding . UTF8 ;
var responder = GetResponder ( message . Buffer , message . ReceivedBytes , encoding ) ;
if ( responder = = null )
{
encoding = Encoding . Unicode ;
responder = GetResponder ( message . Buffer , message . ReceivedBytes , encoding ) ;
}
if ( responder ! = null )
{
var cancellationToken = CancellationToken . None ;
try
{
await responder . Item2 . Item3 ( responder . Item1 , message . RemoteEndPoint , encoding , cancellationToken ) . ConfigureAwait ( false ) ;
}
catch ( OperationCanceledException )
{
}
catch ( Exception ex )
{
_logger . LogError ( ex , "Error in OnMessageReceived" ) ;
}
}
}
private Tuple < string , Tuple < string , bool , Func < string , IPEndPoint , Encoding , CancellationToken , Task > > > GetResponder ( byte [ ] buffer , int bytesReceived , Encoding encoding )
private async Task RespondToV2Message ( string messageText , EndPoint endpoint , CancellationToken cancellationToken )
{
var text = encoding . GetString ( buffer , 0 , bytesReceived ) ;
var responder = _responders . FirstOrDefault ( i = >
{
if ( i . Item2 )
{
return text . IndexOf ( i . Item1 , StringComparison . OrdinalIgnoreCase ) ! = - 1 ;
}
return string . Equals ( i . Item1 , text , StringComparison . OrdinalIgnoreCase ) ;
} ) ;
if ( responder = = null )
{
return null ;
}
return new Tuple < string , Tuple < string , bool , Func < string , IPEndPoint , Encoding , CancellationToken , Task > > > ( text , responder ) ;
}
private async Task RespondToV2Message ( string messageText , IPEndPoint endpoint , Encoding encoding , CancellationToken cancellationToken )
{
var parts = messageText . Split ( '|' ) ;
var localUrl = await _appHost . GetLocalApiUrl ( cancellationToken ) . ConfigureAwait ( false ) ;
if ( ! string . IsNullOrEmpty ( localUrl ) )
@ -118,8 +50,16 @@ namespace Emby.Server.Implementations.Udp
Name = _appHost . FriendlyName
} ;
await SendAsync ( encoding . GetBytes ( _json . SerializeToString ( response ) ) , endpoint , cancellationToken ) . ConfigureAwait ( false ) ;
try
{
await _udpSocket . SendToAsync ( JsonSerializer . SerializeToUtf8Bytes ( response ) , SocketFlags . None , endpoint ) . ConfigureAwait ( false ) ;
}
catch ( SocketException ex )
{
_logger . LogError ( ex , "Error sending response message" ) ;
}
var parts = messageText . Split ( '|' ) ;
if ( parts . Length > 1 )
{
_appHost . EnableLoopback ( parts [ 1 ] ) ;
@ -131,162 +71,60 @@ namespace Emby.Server.Implementations.Udp
}
}
/// <summary>
/// The _udp client
/// </summary>
private ISocket _udpClient ;
private readonly ISocketFactory _socketFactory ;
/// <summary>
/// Starts the specified port.
/// </summary>
/// <param name="port">The port.</param>
public void Start ( int port )
/// <param name="cancellationToken"></param>
public void Start ( int port , CancellationToken cancellationToken )
{
_ udpClient = _socketFactory . CreateUdpSocket ( port ) ;
_ endpoint = new IPEndPoint ( IPAddress . Any , port ) ;
Task . Run ( ( ) = > BeginReceive ( ) ) ;
}
private readonly byte [ ] _receiveBuffer = new byte [ 8192 ] ;
private void BeginReceive ( )
{
if ( _isDisposed )
{
return ;
}
_udpSocket = new Socket ( AddressFamily . InterNetwork , SocketType . Dgram , ProtocolType . Udp ) ;
_udpSocket . SetSocketOption ( SocketOptionLevel . Socket , SocketOptionName . ReuseAddress , true ) ;
_udpSocket . Bind ( _endpoint ) ;
try
{
var result = _udpClient . BeginReceive ( _receiveBuffer , 0 , _receiveBuffer . Length , OnReceiveResult ) ;
if ( result . CompletedSynchronously )
{
OnReceiveResult ( result ) ;
}
}
catch ( ObjectDisposedException )
{
//TODO Investigate and properly fix.
}
catch ( Exception ex )
{
_logger . LogError ( ex , "Error receiving udp message" ) ;
}
_ = Task . Run ( async ( ) = > await BeginReceiveAsync ( cancellationToken ) . ConfigureAwait ( false ) , cancellationToken ) . ConfigureAwait ( false ) ;
}
private void OnReceiveResult ( IAsyncResult result )
private async Task BeginReceiveAsync ( CancellationToken cancellationToken )
{
if ( _isDisposed )
{
return ;
}
try
{
var socketResult = _udpClient . EndReceive ( result ) ;
OnMessageReceived ( socketResult ) ;
}
catch ( ObjectDisposedException )
{
//TODO Investigate and properly fix.
}
catch ( Exception ex )
while ( ! cancellationToken . IsCancellationRequested )
{
_logger . LogError ( ex , "Error receiving udp message" ) ;
}
BeginReceive ( ) ;
}
/// <summary>
/// Called when [message received].
/// </summary>
/// <param name="message">The message.</param>
private void OnMessageReceived ( SocketReceiveResult message )
{
if ( _isDisposed )
{
return ;
}
if ( message . RemoteEndPoint . Port = = 0 )
{
return ;
}
try
{
OnMessageReceived ( new GenericEventArgs < SocketReceiveResult >
try
{
Argument = message
} ) ;
}
catch ( Exception ex )
{
_logger . LogError ( ex , "Error handling UDP message" ) ;
}
}
/// <summary>
/// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
/// </summary>
public void Dispose ( )
{
Dispose ( true ) ;
}
var result = await _udpSocket . ReceiveFromAsync ( _receiveBuffer , SocketFlags . None , _endpoint ) . ConfigureAwait ( false ) ;
/// <summary>
/// Releases unmanaged and - optionally - managed resources.
/// </summary>
/// <param name="dispose"><c>true</c> to release both managed and unmanaged resources; <c>false</c> to release only unmanaged resources.</param>
protected virtual void Dispose ( bool dispose )
{
if ( dispose )
{
_isDisposed = true ;
cancellationToken . ThrowIfCancellationRequested ( ) ;
if ( _udpClient ! = null )
var text = Encoding . UTF8 . GetString ( _receiveBuffer , 0 , result . ReceivedBytes ) ;
if ( text . Contains ( "who is JellyfinServer?" , StringComparison . OrdinalIgnoreCase ) )
{
await RespondToV2Message ( text , result . RemoteEndPoint , cancellationToken ) . ConfigureAwait ( false ) ;
}
}
catch ( SocketException ex )
{
_udpClient . Dispose ( ) ;
_logger . LogError ( ex , "Failed to receive data drom socket" ) ;
}
catch ( OperationCanceledException )
{
// Don't throw
}
}
}
public async Task SendAsync ( byte [ ] bytes , IPEndPoint remoteEndPoint , CancellationToken cancellationToken )
/// <inheritdoc />
public void Dispose ( )
{
if ( _ isD isposed)
if ( _ d isposed)
{
throw new ObjectDisposedException ( GetType ( ) . Name ) ;
}
if ( bytes = = null )
{
throw new ArgumentNullException ( nameof ( bytes ) ) ;
}
if ( remoteEndPoint = = null )
{
throw new ArgumentNullException ( nameof ( remoteEndPoint ) ) ;
return ;
}
try
{
await _udpClient . SendToAsync ( bytes , 0 , bytes . Length , remoteEndPoint , cancellationToken ) . ConfigureAwait ( false ) ;
_udpSocket ? . Dispose ( ) ;
_logger . LogInformation ( "Udp message sent to {remoteEndPoint}" , remoteEndPoint ) ;
}
catch ( OperationCanceledException )
{
}
catch ( Exception ex )
{
_logger . LogError ( ex , "Error sending message to {remoteEndPoint}" , remoteEndPoint ) ;
}
GC . SuppressFinalize ( this ) ;
}
}
}