From a816a83f3a6c33c3ce710c717859a30b5fbd40f2 Mon Sep 17 00:00:00 2001 From: "kay.one" Date: Sat, 11 May 2013 16:38:41 -0700 Subject: [PATCH] improvements to scheduler, better parallelism on RSS fetch --- .../Messaging/CommandCompletedEvent.cs | 12 ++++++ .../Messaging/CommandFailedEvent.cs | 16 ++++++++ .../Messaging/CommandStartedEvent.cs | 12 ++++++ .../Messaging/MessageAggregator.cs | 9 ++++- NzbDrone.Common/Messaging/TestCommand.cs | 13 +++++++ .../Messaging/TestCommandExecutor.cs | 12 ++++++ NzbDrone.Common/NzbDrone.Common.csproj | 5 +++ .../Indexers/FetchAndParseRssService.cs | 2 +- NzbDrone.Core/Jobs/Scheduler.cs | 39 ++++++++++--------- NzbDrone.Core/Jobs/TaskManager.cs | 14 +++---- NzbDrone/NLog.config | 2 +- NzbDrone/app.config | 5 +++ 12 files changed, 113 insertions(+), 28 deletions(-) create mode 100644 NzbDrone.Common/Messaging/CommandCompletedEvent.cs create mode 100644 NzbDrone.Common/Messaging/CommandFailedEvent.cs create mode 100644 NzbDrone.Common/Messaging/CommandStartedEvent.cs create mode 100644 NzbDrone.Common/Messaging/TestCommand.cs create mode 100644 NzbDrone.Common/Messaging/TestCommandExecutor.cs diff --git a/NzbDrone.Common/Messaging/CommandCompletedEvent.cs b/NzbDrone.Common/Messaging/CommandCompletedEvent.cs new file mode 100644 index 000000000..613800ae0 --- /dev/null +++ b/NzbDrone.Common/Messaging/CommandCompletedEvent.cs @@ -0,0 +1,12 @@ +namespace NzbDrone.Common.Messaging +{ + public class CommandCompletedEvent : IEvent + { + public ICommand Command { get; private set; } + + public CommandCompletedEvent(ICommand command) + { + Command = command; + } + } +} \ No newline at end of file diff --git a/NzbDrone.Common/Messaging/CommandFailedEvent.cs b/NzbDrone.Common/Messaging/CommandFailedEvent.cs new file mode 100644 index 000000000..d33ab79f8 --- /dev/null +++ b/NzbDrone.Common/Messaging/CommandFailedEvent.cs @@ -0,0 +1,16 @@ +using System; + +namespace NzbDrone.Common.Messaging +{ + public class CommandFailedEvent : IEvent + { + public ICommand Command { get; private set; } + public Exception Exception { get; private set; } + + public CommandFailedEvent(ICommand command, Exception exception) + { + Command = command; + Exception = exception; + } + } +} \ No newline at end of file diff --git a/NzbDrone.Common/Messaging/CommandStartedEvent.cs b/NzbDrone.Common/Messaging/CommandStartedEvent.cs new file mode 100644 index 000000000..3cb4e7f55 --- /dev/null +++ b/NzbDrone.Common/Messaging/CommandStartedEvent.cs @@ -0,0 +1,12 @@ +namespace NzbDrone.Common.Messaging +{ + public class CommandExecutedEvent : IEvent + { + public ICommand Command { get; private set; } + + public CommandExecutedEvent(ICommand command) + { + Command = command; + } + } +} \ No newline at end of file diff --git a/NzbDrone.Common/Messaging/MessageAggregator.cs b/NzbDrone.Common/Messaging/MessageAggregator.cs index 2fd60c672..18dae807c 100644 --- a/NzbDrone.Common/Messaging/MessageAggregator.cs +++ b/NzbDrone.Common/Messaging/MessageAggregator.cs @@ -75,16 +75,23 @@ namespace NzbDrone.Common.Messaging try { - handlerContract.GetMethod("Execute").Invoke(handler, new object[] { command }); + handlerContract.GetMethod("Execute").Invoke(handler, new object[] {command}); + PublishEvent(new CommandCompletedEvent(command)); } catch (TargetInvocationException e) { + PublishEvent(new CommandFailedEvent(command, e)); + if (e.InnerException != null) { throw e.InnerException; } throw; } + finally + { + PublishEvent(new CommandExecutedEvent(command)); + } _logger.Debug("{0} <- {1}", command.GetType().Name, handler.GetType().Name); } diff --git a/NzbDrone.Common/Messaging/TestCommand.cs b/NzbDrone.Common/Messaging/TestCommand.cs new file mode 100644 index 000000000..1a54d5764 --- /dev/null +++ b/NzbDrone.Common/Messaging/TestCommand.cs @@ -0,0 +1,13 @@ +namespace NzbDrone.Common.Messaging +{ + public class TestCommand : ICommand + { + public TestCommand() + { + Duration = 4000; + } + + public int Duration { get; set; } + + } +} \ No newline at end of file diff --git a/NzbDrone.Common/Messaging/TestCommandExecutor.cs b/NzbDrone.Common/Messaging/TestCommandExecutor.cs new file mode 100644 index 000000000..be0ea4ea4 --- /dev/null +++ b/NzbDrone.Common/Messaging/TestCommandExecutor.cs @@ -0,0 +1,12 @@ +using System.Threading; + +namespace NzbDrone.Common.Messaging +{ + public class TestCommandExecutor : IExecute + { + public void Execute(TestCommand message) + { + Thread.Sleep(message.Duration); + } + } +} \ No newline at end of file diff --git a/NzbDrone.Common/NzbDrone.Common.csproj b/NzbDrone.Common/NzbDrone.Common.csproj index 0bf371efd..2d6119d33 100644 --- a/NzbDrone.Common/NzbDrone.Common.csproj +++ b/NzbDrone.Common/NzbDrone.Common.csproj @@ -103,6 +103,9 @@ + + + @@ -120,6 +123,8 @@ + + diff --git a/NzbDrone.Core/Indexers/FetchAndParseRssService.cs b/NzbDrone.Core/Indexers/FetchAndParseRssService.cs index 686e370e3..bd1ca797f 100644 --- a/NzbDrone.Core/Indexers/FetchAndParseRssService.cs +++ b/NzbDrone.Core/Indexers/FetchAndParseRssService.cs @@ -38,7 +38,7 @@ namespace NzbDrone.Core.Indexers _logger.Debug("Available indexers {0}", indexers.Count); - Parallel.ForEach(indexers, indexer => + Parallel.ForEach(indexers, new ParallelOptions { MaxDegreeOfParallelism = 10 }, indexer => { var indexerFeed = _feedFetcher.FetchRss(indexer); diff --git a/NzbDrone.Core/Jobs/Scheduler.cs b/NzbDrone.Core/Jobs/Scheduler.cs index e5e79e326..f245e6aab 100644 --- a/NzbDrone.Core/Jobs/Scheduler.cs +++ b/NzbDrone.Core/Jobs/Scheduler.cs @@ -33,29 +33,32 @@ namespace NzbDrone.Core.Jobs private void ExecuteCommands() { - var tasks = _taskManager.GetPending(); + try + { + Timer.Enabled = false; - _logger.Trace("Pending Tasks: {0}", tasks.Count); + var tasks = _taskManager.GetPending(); - foreach (var task in tasks) - { - try - { - var commandType = Type.GetType(task.TypeName); - var command = (ICommand)Activator.CreateInstance(commandType); + _logger.Trace("Pending Tasks: {0}", tasks.Count); - _messageAggregator.PublishCommand(command); - } - catch (Exception e) - { - _logger.ErrorException("Error occurred while execution task " + task.TypeName, e); - } - finally + foreach (var task in tasks) { - _taskManager.SetLastExecutionTime(task.Id); - } - + try + { + var commandType = Type.GetType(task.TypeName); + var command = (ICommand)Activator.CreateInstance(commandType); + _messageAggregator.PublishCommand(command); + } + catch (Exception e) + { + _logger.ErrorException("Error occurred while execution task " + task.TypeName, e); + } + } + } + finally + { + Timer.Enabled = true; } } diff --git a/NzbDrone.Core/Jobs/TaskManager.cs b/NzbDrone.Core/Jobs/TaskManager.cs index d2985fa0a..fe82b7f21 100644 --- a/NzbDrone.Core/Jobs/TaskManager.cs +++ b/NzbDrone.Core/Jobs/TaskManager.cs @@ -12,10 +12,9 @@ namespace NzbDrone.Core.Jobs public interface ITaskManager { IList GetPending(); - void SetLastExecutionTime(int taskId); } - public class TaskManager : IHandle, ITaskManager + public class TaskManager : IHandle, IHandleAsync, ITaskManager { private readonly IScheduledTaskRepository _scheduledTaskRepository; private readonly Logger _logger; @@ -32,11 +31,6 @@ namespace NzbDrone.Core.Jobs return _scheduledTaskRepository.All().Where(c => c.LastExecution.AddMinutes(c.Interval) < DateTime.UtcNow).ToList(); } - public void SetLastExecutionTime(int taskId) - { - _scheduledTaskRepository.SetLastExecutionTime(taskId, DateTime.UtcNow); - } - public void Handle(ApplicationStartedEvent message) { var defaultTasks = new[] @@ -72,5 +66,11 @@ namespace NzbDrone.Core.Jobs } } + + public void HandleAsync(CommandExecutedEvent message) + { + var commandId = _scheduledTaskRepository.GetDefinition(message.Command.GetType()).Id; + _scheduledTaskRepository.SetLastExecutionTime(commandId, DateTime.UtcNow); + } } } \ No newline at end of file diff --git a/NzbDrone/NLog.config b/NzbDrone/NLog.config index 6ed2d7f88..23f2d521e 100644 --- a/NzbDrone/NLog.config +++ b/NzbDrone/NLog.config @@ -25,7 +25,7 @@ layout="${date:format=yy-M-d HH\:mm\:ss.f}|${logger}}|${level}|${message}|${exception:format=ToString}"/> - + diff --git a/NzbDrone/app.config b/NzbDrone/app.config index d4319d1f4..ec44b8e8b 100644 --- a/NzbDrone/app.config +++ b/NzbDrone/app.config @@ -1,5 +1,10 @@  + + + + +