improvements to scheduler,

better parallelism on RSS fetch
pull/3113/head
kay.one 12 years ago
parent ff225e1753
commit a816a83f3a

@ -0,0 +1,12 @@
namespace NzbDrone.Common.Messaging
{
public class CommandCompletedEvent : IEvent
{
public ICommand Command { get; private set; }
public CommandCompletedEvent(ICommand command)
{
Command = command;
}
}
}

@ -0,0 +1,16 @@
using System;
namespace NzbDrone.Common.Messaging
{
public class CommandFailedEvent : IEvent
{
public ICommand Command { get; private set; }
public Exception Exception { get; private set; }
public CommandFailedEvent(ICommand command, Exception exception)
{
Command = command;
Exception = exception;
}
}
}

@ -0,0 +1,12 @@
namespace NzbDrone.Common.Messaging
{
public class CommandExecutedEvent : IEvent
{
public ICommand Command { get; private set; }
public CommandExecutedEvent(ICommand command)
{
Command = command;
}
}
}

@ -75,16 +75,23 @@ namespace NzbDrone.Common.Messaging
try
{
handlerContract.GetMethod("Execute").Invoke(handler, new object[] { command });
handlerContract.GetMethod("Execute").Invoke(handler, new object[] {command});
PublishEvent(new CommandCompletedEvent(command));
}
catch (TargetInvocationException e)
{
PublishEvent(new CommandFailedEvent(command, e));
if (e.InnerException != null)
{
throw e.InnerException;
}
throw;
}
finally
{
PublishEvent(new CommandExecutedEvent(command));
}
_logger.Debug("{0} <- {1}", command.GetType().Name, handler.GetType().Name);
}

@ -0,0 +1,13 @@
namespace NzbDrone.Common.Messaging
{
public class TestCommand : ICommand
{
public TestCommand()
{
Duration = 4000;
}
public int Duration { get; set; }
}
}

@ -0,0 +1,12 @@
using System.Threading;
namespace NzbDrone.Common.Messaging
{
public class TestCommandExecutor : IExecute<TestCommand>
{
public void Execute(TestCommand message)
{
Thread.Sleep(message.Duration);
}
}
}

@ -103,6 +103,9 @@
<Compile Include="EnsureThat\Resources\ExceptionMessages.Designer.cs" />
<Compile Include="EnsureThat\StringExtensions.cs" />
<Compile Include="EnsureThat\TypeParam.cs" />
<Compile Include="Messaging\CommandCompletedEvent.cs" />
<Compile Include="Messaging\CommandStartedEvent.cs" />
<Compile Include="Messaging\CommandFailedEvent.cs" />
<Compile Include="Messaging\IExecute.cs" />
<Compile Include="Messaging\ICommand.cs" />
<Compile Include="Messaging\IMessage.cs" />
@ -120,6 +123,8 @@
<Compile Include="IJsonSerializer.cs" />
<Compile Include="Instrumentation\VersionLayoutRenderer.cs" />
<Compile Include="Messaging\MessageExtensions.cs" />
<Compile Include="Messaging\TestCommand.cs" />
<Compile Include="Messaging\TestCommandExecutor.cs" />
<Compile Include="Reflection\ReflectionExtensions.cs" />
<Compile Include="ServiceFactory.cs" />
<Compile Include="StringExtention.cs" />

@ -38,7 +38,7 @@ namespace NzbDrone.Core.Indexers
_logger.Debug("Available indexers {0}", indexers.Count);
Parallel.ForEach(indexers, indexer =>
Parallel.ForEach(indexers, new ParallelOptions { MaxDegreeOfParallelism = 10 }, indexer =>
{
var indexerFeed = _feedFetcher.FetchRss(indexer);

@ -33,29 +33,32 @@ namespace NzbDrone.Core.Jobs
private void ExecuteCommands()
{
var tasks = _taskManager.GetPending();
try
{
Timer.Enabled = false;
_logger.Trace("Pending Tasks: {0}", tasks.Count);
var tasks = _taskManager.GetPending();
foreach (var task in tasks)
{
try
{
var commandType = Type.GetType(task.TypeName);
var command = (ICommand)Activator.CreateInstance(commandType);
_logger.Trace("Pending Tasks: {0}", tasks.Count);
_messageAggregator.PublishCommand(command);
}
catch (Exception e)
{
_logger.ErrorException("Error occurred while execution task " + task.TypeName, e);
}
finally
foreach (var task in tasks)
{
_taskManager.SetLastExecutionTime(task.Id);
}
try
{
var commandType = Type.GetType(task.TypeName);
var command = (ICommand)Activator.CreateInstance(commandType);
_messageAggregator.PublishCommand(command);
}
catch (Exception e)
{
_logger.ErrorException("Error occurred while execution task " + task.TypeName, e);
}
}
}
finally
{
Timer.Enabled = true;
}
}

@ -12,10 +12,9 @@ namespace NzbDrone.Core.Jobs
public interface ITaskManager
{
IList<ScheduledTask> GetPending();
void SetLastExecutionTime(int taskId);
}
public class TaskManager : IHandle<ApplicationStartedEvent>, ITaskManager
public class TaskManager : IHandle<ApplicationStartedEvent>, IHandleAsync<CommandExecutedEvent>, ITaskManager
{
private readonly IScheduledTaskRepository _scheduledTaskRepository;
private readonly Logger _logger;
@ -32,11 +31,6 @@ namespace NzbDrone.Core.Jobs
return _scheduledTaskRepository.All().Where(c => c.LastExecution.AddMinutes(c.Interval) < DateTime.UtcNow).ToList();
}
public void SetLastExecutionTime(int taskId)
{
_scheduledTaskRepository.SetLastExecutionTime(taskId, DateTime.UtcNow);
}
public void Handle(ApplicationStartedEvent message)
{
var defaultTasks = new[]
@ -72,5 +66,11 @@ namespace NzbDrone.Core.Jobs
}
}
public void HandleAsync(CommandExecutedEvent message)
{
var commandId = _scheduledTaskRepository.GetDefinition(message.Command.GetType()).Id;
_scheduledTaskRepository.SetLastExecutionTime(commandId, DateTime.UtcNow);
}
}
}

@ -25,7 +25,7 @@
layout="${date:format=yy-M-d HH\:mm\:ss.f}|${logger}}|${level}|${message}|${exception:format=ToString}"/>
</targets>
<rules>
<logger name="*" minlevel="Trace" writeTo="consoleLogger"/>
<logger name="*" minlevel="Debug" writeTo="consoleLogger"/>
<logger name="*" minlevel="Off" writeTo="udpTarget"/>
<logger name="*" minlevel="Warn" writeTo="rollingFileLogger"/>
</rules>

@ -1,5 +1,10 @@
<?xml version="1.0" encoding="utf-8"?>
<configuration>
<system.net>
<connectionManagement>
<add address="*" maxconnection="100" />
</connectionManagement>
</system.net>
<startup useLegacyV2RuntimeActivationPolicy="true">
<supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.0" />
</startup>

Loading…
Cancel
Save