@ -15,7 +15,7 @@ namespace MediaBrowser.Common.Net
/// <typeparam name="TReturnDataType">The type of the T return data type.</typeparam>
/// <typeparam name="TReturnDataType">The type of the T return data type.</typeparam>
/// <typeparam name="TStateType">The type of the T state type.</typeparam>
/// <typeparam name="TStateType">The type of the T state type.</typeparam>
public abstract class BasePeriodicWebSocketListener < TReturnDataType , TStateType > : IWebSocketListener , IDisposable
public abstract class BasePeriodicWebSocketListener < TReturnDataType , TStateType > : IWebSocketListener , IDisposable
where TStateType : class , new ( )
where TStateType : WebSocketListenerState , new ( )
where TReturnDataType : class
where TReturnDataType : class
{
{
/// <summary>
/// <summary>
@ -84,6 +84,14 @@ namespace MediaBrowser.Common.Net
protected readonly CultureInfo UsCulture = new CultureInfo ( "en-US" ) ;
protected readonly CultureInfo UsCulture = new CultureInfo ( "en-US" ) ;
protected virtual bool SendOnTimer
{
get
{
return true ;
}
}
/// <summary>
/// <summary>
/// Starts sending messages over a web socket
/// Starts sending messages over a web socket
/// </summary>
/// </summary>
@ -99,9 +107,15 @@ namespace MediaBrowser.Common.Net
Logger . Info ( "{1} Begin transmitting over websocket to {0}" , message . Connection . RemoteEndPoint , GetType ( ) . Name ) ;
Logger . Info ( "{1} Begin transmitting over websocket to {0}" , message . Connection . RemoteEndPoint , GetType ( ) . Name ) ;
var timer = new Timer ( TimerCallback , message . Connection , Timeout . Infinite , Timeout . Infinite ) ;
var timer = SendOnTimer ?
new Timer ( TimerCallback , message . Connection , Timeout . Infinite , Timeout . Infinite ) :
null ;
var state = new TStateType ( ) ;
var state = new TStateType
{
IntervalMs = periodMs ,
InitialDelayMs = dueTimeMs
} ;
var semaphore = new SemaphoreSlim ( 1 , 1 ) ;
var semaphore = new SemaphoreSlim ( 1 , 1 ) ;
@ -110,14 +124,17 @@ namespace MediaBrowser.Common.Net
ActiveConnections . Add ( new Tuple < IWebSocketConnection , CancellationTokenSource , Timer , TStateType , SemaphoreSlim > ( message . Connection , cancellationTokenSource , timer , state , semaphore ) ) ;
ActiveConnections . Add ( new Tuple < IWebSocketConnection , CancellationTokenSource , Timer , TStateType , SemaphoreSlim > ( message . Connection , cancellationTokenSource , timer , state , semaphore ) ) ;
}
}
timer . Change ( TimeSpan . FromMilliseconds ( dueTimeMs ) , TimeSpan . FromMilliseconds ( periodMs ) ) ;
if ( timer ! = null )
{
timer . Change ( TimeSpan . FromMilliseconds ( dueTimeMs ) , TimeSpan . FromMilliseconds ( periodMs ) ) ;
}
}
}
/// <summary>
/// <summary>
/// Timers the callback.
/// Timers the callback.
/// </summary>
/// </summary>
/// <param name="state">The state.</param>
/// <param name="state">The state.</param>
private async void TimerCallback ( object state )
private void TimerCallback ( object state )
{
{
var connection = ( IWebSocketConnection ) state ;
var connection = ( IWebSocketConnection ) state ;
@ -139,11 +156,50 @@ namespace MediaBrowser.Common.Net
return ;
return ;
}
}
SendData ( tuple ) ;
}
protected void SendData ( bool force )
{
List < Tuple < IWebSocketConnection , CancellationTokenSource , Timer , TStateType , SemaphoreSlim > > tuples ;
lock ( ActiveConnections )
{
tuples = ActiveConnections
. Where ( c = >
{
if ( c . Item1 . State = = WebSocketState . Open & & ! c . Item2 . IsCancellationRequested )
{
var state = c . Item4 ;
if ( force | | ( DateTime . UtcNow - state . DateLastSendUtc ) . TotalMilliseconds > = state . IntervalMs )
{
return true ;
}
}
return false ;
} )
. ToList ( ) ;
}
foreach ( var tuple in tuples )
{
SendData ( tuple ) ;
}
}
private async void SendData ( Tuple < IWebSocketConnection , CancellationTokenSource , Timer , TStateType , SemaphoreSlim > tuple )
{
var connection = tuple . Item1 ;
try
try
{
{
await tuple . Item5 . WaitAsync ( tuple . Item2 . Token ) . ConfigureAwait ( false ) ;
await tuple . Item5 . WaitAsync ( tuple . Item2 . Token ) . ConfigureAwait ( false ) ;
var data = await GetDataToSend ( tuple . Item4 ) . ConfigureAwait ( false ) ;
var state = tuple . Item4 ;
var data = await GetDataToSend ( state ) . ConfigureAwait ( false ) ;
if ( data ! = null )
if ( data ! = null )
{
{
@ -153,6 +209,8 @@ namespace MediaBrowser.Common.Net
Data = data
Data = data
} , tuple . Item2 . Token ) . ConfigureAwait ( false ) ;
} , tuple . Item2 . Token ) . ConfigureAwait ( false ) ;
state . DateLastSendUtc = DateTime . UtcNow ;
}
}
tuple . Item5 . Release ( ) ;
tuple . Item5 . Release ( ) ;
@ -196,13 +254,18 @@ namespace MediaBrowser.Common.Net
{
{
Logger . Info ( "{1} stop transmitting over websocket to {0}" , connection . Item1 . RemoteEndPoint , GetType ( ) . Name ) ;
Logger . Info ( "{1} stop transmitting over websocket to {0}" , connection . Item1 . RemoteEndPoint , GetType ( ) . Name ) ;
try
var timer = connection . Item3 ;
{
connection . Item3 . Dispose ( ) ;
if ( timer ! = null )
}
catch ( ObjectDisposedException )
{
{
try
{
timer . Dispose ( ) ;
}
catch ( ObjectDisposedException )
{
}
}
}
try
try
@ -253,4 +316,11 @@ namespace MediaBrowser.Common.Net
Dispose ( true ) ;
Dispose ( true ) ;
}
}
}
}
public class WebSocketListenerState
{
public DateTime DateLastSendUtc { get ; set ; }
public long InitialDelayMs { get ; set ; }
public long IntervalMs { get ; set ; }
}
}
}