@ -1,11 +1,9 @@
using System ;
using System ;
using System.Collections.Concurrent ;
using System.Collections.Generic ;
using System.Collections.Generic ;
using System.Linq ;
using System.Linq ;
using System.Threading ;
using System.Threading ;
using NLog ;
using NLog ;
using NzbDrone.Common ;
using NzbDrone.Common ;
using NzbDrone.Common.Cache ;
using NzbDrone.Common.EnsureThat ;
using NzbDrone.Common.EnsureThat ;
using NzbDrone.Common.Serializer ;
using NzbDrone.Common.Serializer ;
using NzbDrone.Core.Lifecycle ;
using NzbDrone.Core.Lifecycle ;
@ -35,20 +33,17 @@ namespace NzbDrone.Core.Messaging.Commands
private readonly IServiceFactory _serviceFactory ;
private readonly IServiceFactory _serviceFactory ;
private readonly Logger _logger ;
private readonly Logger _logger ;
private readonly ICached < CommandModel > _commandCache ;
private readonly CommandQueue _commandQueue ;
private readonly BlockingCollection < CommandModel > _commandQueue ;
public CommandQueueManager ( ICommandRepository repo ,
public CommandQueueManager ( ICommandRepository repo ,
IServiceFactory serviceFactory ,
IServiceFactory serviceFactory ,
ICacheManager cacheManager ,
Logger logger )
Logger logger )
{
{
_repo = repo ;
_repo = repo ;
_serviceFactory = serviceFactory ;
_serviceFactory = serviceFactory ;
_logger = logger ;
_logger = logger ;
_commandCache = cacheManager . GetCache < CommandModel > ( GetType ( ) ) ;
_commandQueue = new CommandQueue ( ) ;
_commandQueue = new BlockingCollection < CommandModel > ( new CommandQueue ( ) ) ;
}
}
public List < CommandModel > PushMany < TCommand > ( List < TCommand > commands ) where TCommand : Command
public List < CommandModel > PushMany < TCommand > ( List < TCommand > commands ) where TCommand : Command
@ -56,8 +51,7 @@ namespace NzbDrone.Core.Messaging.Commands
_logger . Trace ( "Publishing {0} commands" , commands . Count ) ;
_logger . Trace ( "Publishing {0} commands" , commands . Count ) ;
var commandModels = new List < CommandModel > ( ) ;
var commandModels = new List < CommandModel > ( ) ;
var existingCommands = _commandCache . Values . Where ( q = > q . Status = = CommandStatus . Queued | |
var existingCommands = _commandQueue . QueuedOrStarted ( ) ;
q . Status = = CommandStatus . Started ) . ToList ( ) ;
foreach ( var command in commands )
foreach ( var command in commands )
{
{
@ -86,7 +80,6 @@ namespace NzbDrone.Core.Messaging.Commands
foreach ( var commandModel in commandModels )
foreach ( var commandModel in commandModels )
{
{
_commandCache . Set ( commandModel . Id . ToString ( ) , commandModel ) ;
_commandQueue . Add ( commandModel ) ;
_commandQueue . Add ( commandModel ) ;
}
}
@ -124,7 +117,6 @@ namespace NzbDrone.Core.Messaging.Commands
_logger . Trace ( "Inserting new command: {0}" , commandModel . Name ) ;
_logger . Trace ( "Inserting new command: {0}" , commandModel . Name ) ;
_repo . Insert ( commandModel ) ;
_repo . Insert ( commandModel ) ;
_commandCache . Set ( commandModel . Id . ToString ( ) , commandModel ) ;
_commandQueue . Add ( commandModel ) ;
_commandQueue . Add ( commandModel ) ;
return commandModel ;
return commandModel ;
@ -146,28 +138,31 @@ namespace NzbDrone.Core.Messaging.Commands
public CommandModel Get ( int id )
public CommandModel Get ( int id )
{
{
return _commandCache . Get ( id . ToString ( ) , ( ) = > FindCommand ( _repo . Get ( id ) ) ) ;
var command = _commandQueue . Find ( id ) ;
if ( command = = null )
{
command = _repo . Get ( id ) ;
}
return command ;
}
}
public List < CommandModel > GetStarted ( )
public List < CommandModel > GetStarted ( )
{
{
_logger . Trace ( "Getting started commands" ) ;
_logger . Trace ( "Getting started commands" ) ;
return _commandCache . Values . Where ( c = > c . Status = = CommandStatus . Started ) . ToList ( ) ;
return _command Queue. All ( ) . Where ( c = > c . Status = = CommandStatus . Started ) . ToList ( ) ;
}
}
public void SetMessage ( CommandModel command , string message )
public void SetMessage ( CommandModel command , string message )
{
{
command . Message = message ;
command . Message = message ;
_commandCache . Set ( command . Id . ToString ( ) , command ) ;
}
}
public void Start ( CommandModel command )
public void Start ( CommandModel command )
{
{
command . StartedAt = DateTime . UtcNow ;
// Marks the command as started in the DB, the queue takes care of marking it as started on it's own
command . Status = CommandStatus . Started ;
_logger . Trace ( "Marking command as started: {0}" , command . Name ) ;
_logger . Trace ( "Marking command as started: {0}" , command . Name ) ;
_commandCache . Set ( command . Id . ToString ( ) , command ) ;
_repo . Start ( command ) ;
_repo . Start ( command ) ;
}
}
@ -195,12 +190,11 @@ namespace NzbDrone.Core.Messaging.Commands
{
{
_logger . Trace ( "Cleaning up old commands" ) ;
_logger . Trace ( "Cleaning up old commands" ) ;
var old = _commandCache . Values . Where ( c = > c . EndedAt < DateTime . UtcNow . AddMinutes ( - 5 ) ) ;
var commands = _commandQueue . All ( )
. Where ( c = > c . EndedAt < DateTime . UtcNow . AddMinutes ( - 5 ) )
. ToList ( ) ;
foreach ( var command in old )
_commandQueue . RemoveMany ( commands ) ;
{
_commandCache . Remove ( command . Id . ToString ( ) ) ;
}
_repo . Trim ( ) ;
_repo . Trim ( ) ;
}
}
@ -215,18 +209,6 @@ namespace NzbDrone.Core.Messaging.Commands
return Json . Deserialize ( "{}" , commandType ) ;
return Json . Deserialize ( "{}" , commandType ) ;
}
}
private CommandModel FindCommand ( CommandModel command )
{
var cachedCommand = _commandCache . Find ( command . Id . ToString ( ) ) ;
if ( cachedCommand ! = null )
{
command . Message = cachedCommand . Message ;
}
return command ;
}
private void Update ( CommandModel command , CommandStatus status , string message )
private void Update ( CommandModel command , CommandStatus status , string message )
{
{
SetMessage ( command , message ) ;
SetMessage ( command , message ) ;
@ -236,15 +218,14 @@ namespace NzbDrone.Core.Messaging.Commands
command . Status = status ;
command . Status = status ;
_logger . Trace ( "Updating command status" ) ;
_logger . Trace ( "Updating command status" ) ;
_commandCache . Set ( command . Id . ToString ( ) , command ) ;
_repo . End ( command ) ;
_repo . End ( command ) ;
}
}
private List < CommandModel > QueuedOrStarted ( string name )
private List < CommandModel > QueuedOrStarted ( string name )
{
{
return _command Cache. Values . Where ( q = > q . Name = = name & &
return _command Queue. QueuedOrStarted ( )
( q . Status = = CommandStatus . Queued | |
. Where ( q = > q . Name = = name )
q . Status = = CommandStatus . Started ) ) . ToList ( ) ;
. ToList ( ) ;
}
}
public void Handle ( ApplicationStartedEvent message )
public void Handle ( ApplicationStartedEvent message )