more granular Concurrency control.

indexer calls are done fully paralleled.
events are dispatched on max of 2 threads.
pull/24/head
Keivan Beigi 12 years ago
parent 763df726f0
commit 9181b1bb91

@ -1,9 +1,11 @@
using System;
using System.Collections.Generic;
using System.Threading;
using Moq;
using NUnit.Framework;
using NzbDrone.Common.Messaging;
using NzbDrone.Test.Common;
using FluentAssertions;
namespace NzbDrone.Common.Test.EventingTests
{
@ -16,6 +18,8 @@ namespace NzbDrone.Common.Test.EventingTests
private Mock<IHandle<EventB>> HandlerB1;
private Mock<IHandle<EventB>> HandlerB2;
private Mock<IHandleAsync<EventA>> AsyncHandlerA1;
[SetUp]
public void Setup()
@ -25,6 +29,8 @@ namespace NzbDrone.Common.Test.EventingTests
HandlerB1 = new Mock<IHandle<EventB>>();
HandlerB2 = new Mock<IHandle<EventB>>();
AsyncHandlerA1 = new Mock<IHandleAsync<EventA>>();
Mocker.GetMock<IServiceFactory>()
.Setup(c => c.BuildAll<IHandle<EventA>>())
.Returns(new List<IHandle<EventA>> { HandlerA1.Object, HandlerA2.Object });
@ -79,6 +85,50 @@ namespace NzbDrone.Common.Test.EventingTests
ExceptionVerification.ExpectedErrors(1);
}
[Test]
public void should_queue_multiple_async_events()
{
var eventA = new EventA();
var handlers = new List<IHandleAsync<EventA>>
{
AsyncHandlerA1.Object,
AsyncHandlerA1.Object,
AsyncHandlerA1.Object,
AsyncHandlerA1.Object,
AsyncHandlerA1.Object,
AsyncHandlerA1.Object,
AsyncHandlerA1.Object,
};
Mocker.GetMock<IServiceFactory>()
.Setup(c => c.BuildAll<IHandle<EventA>>())
.Returns(new List<IHandle<EventA>>());
Mocker.GetMock<IServiceFactory>()
.Setup(c => c.BuildAll<IHandleAsync<EventA>>())
.Returns(handlers);
var counter = new ConcurrencyCounter(handlers.Count);
AsyncHandlerA1.Setup(c => c.HandleAsync(It.IsAny<EventA>()))
.Callback<EventA>(c =>
{
var id = counter.Start();
Thread.Sleep(1000);
counter.Stop(id);
});
Subject.PublishEvent(eventA);
counter.WaitForAllItems();
counter.MaxThreads.Should().Be(2);
}
}

@ -0,0 +1,133 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace NzbDrone.Common.Messaging
{
public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler
{
/// <summary>Whether the current thread is processing work items.</summary>
[ThreadStatic]
private static bool _currentThreadIsProcessingItems;
/// <summary>The list of tasks to be executed.</summary>
private readonly LinkedList<Task> _tasks = new LinkedList<Task>(); // protected by lock(_tasks)
/// <summary>The maximum concurrency level allowed by this scheduler.</summary>
private readonly int _maxDegreeOfParallelism;
/// <summary>Whether the scheduler is currently processing work items.</summary>
private int _delegatesQueuedOrRunning = 0; // protected by lock(_tasks)
/// <summary>
/// Initializes an instance of the LimitedConcurrencyLevelTaskScheduler class with the
/// specified degree of parallelism.
/// </summary>
/// <param name="maxDegreeOfParallelism">The maximum degree of parallelism provided by this scheduler.</param>
public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism)
{
if (maxDegreeOfParallelism < 1) throw new ArgumentOutOfRangeException("maxDegreeOfParallelism");
_maxDegreeOfParallelism = maxDegreeOfParallelism;
}
/// <summary>Queues a task to the scheduler.</summary>
/// <param name="task">The task to be queued.</param>
protected sealed override void QueueTask(Task task)
{
// Add the task to the list of tasks to be processed. If there aren't enough
// delegates currently queued or running to process tasks, schedule another.
lock (_tasks)
{
_tasks.AddLast(task);
if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism)
{
++_delegatesQueuedOrRunning;
NotifyThreadPoolOfPendingWork();
}
}
}
/// <summary>
/// Informs the ThreadPool that there's work to be executed for this scheduler.
/// </summary>
private void NotifyThreadPoolOfPendingWork()
{
ThreadPool.UnsafeQueueUserWorkItem(_ =>
{
// Note that the current thread is now processing work items.
// This is necessary to enable inlining of tasks into this thread.
_currentThreadIsProcessingItems = true;
try
{
// Process all available items in the queue.
while (true)
{
Task item;
lock (_tasks)
{
// When there are no more items to be processed,
// note that we're done processing, and get out.
if (_tasks.Count == 0)
{
--_delegatesQueuedOrRunning;
break;
}
// Get the next item from the queue
item = _tasks.First.Value;
_tasks.RemoveFirst();
}
// Execute the task we pulled out of the queue
base.TryExecuteTask(item);
}
}
// We're done processing items on the current thread
finally { _currentThreadIsProcessingItems = false; }
}, null);
}
/// <summary>Attempts to execute the specified task on the current thread.</summary>
/// <param name="task">The task to be executed.</param>
/// <param name="taskWasPreviouslyQueued"></param>
/// <returns>Whether the task could be executed on the current thread.</returns>
protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
// If this thread isn't already processing a task, we don't support inlining
if (!_currentThreadIsProcessingItems) return false;
// If the task was previously queued, remove it from the queue
if (taskWasPreviouslyQueued) TryDequeue(task);
// Try to run the task.
return base.TryExecuteTask(task);
}
/// <summary>Attempts to remove a previously scheduled task from the scheduler.</summary>
/// <param name="task">The task to be removed.</param>
/// <returns>Whether the task could be found and removed.</returns>
protected sealed override bool TryDequeue(Task task)
{
lock (_tasks) return _tasks.Remove(task);
}
/// <summary>Gets the maximum concurrency level supported by this scheduler.</summary>
public sealed override int MaximumConcurrencyLevel { get { return _maxDegreeOfParallelism; } }
/// <summary>Gets an enumerable of the tasks currently scheduled on this scheduler.</summary>
/// <returns>An enumerable of the tasks currently scheduled.</returns>
protected sealed override IEnumerable<Task> GetScheduledTasks()
{
bool lockTaken = false;
try
{
Monitor.TryEnter(_tasks, ref lockTaken);
if (lockTaken) return _tasks.ToArray();
else throw new NotSupportedException();
}
finally
{
if (lockTaken) Monitor.Exit(_tasks);
}
}
}
}

@ -1,6 +1,5 @@
using System;
using System.Linq;
using System.Reflection;
using System.Threading.Tasks;
using NLog;
using NzbDrone.Common.EnsureThat;
@ -12,11 +11,14 @@ namespace NzbDrone.Common.Messaging
{
private readonly Logger _logger;
private readonly IServiceFactory _serviceFactory;
private readonly TaskFactory _taskFactory;
public MessageAggregator(Logger logger, IServiceFactory serviceFactory)
{
_logger = logger;
_serviceFactory = serviceFactory;
var scheduler = new LimitedConcurrencyLevelTaskScheduler(2);
_taskFactory = new TaskFactory(scheduler);
}
public void PublishEvent<TEvent>(TEvent @event) where TEvent : class ,IEvent
@ -45,12 +47,13 @@ namespace NzbDrone.Common.Messaging
foreach (var handler in _serviceFactory.BuildAll<IHandleAsync<TEvent>>())
{
var handlerLocal = handler;
Task.Factory.StartNew(() =>
_taskFactory.StartNew(() =>
{
_logger.Debug("{0} ~> {1}", eventName, handlerLocal.GetType().Name);
handlerLocal.HandleAsync(@event);
_logger.Debug("{0} <~ {1}", eventName, handlerLocal.GetType().Name);
});
}, TaskCreationOptions.PreferFairness);
}
}

@ -102,6 +102,7 @@
<Compile Include="EnsureThat\ExpressionExtensions.cs" />
<Compile Include="EnsureThat\Param.cs" />
<Compile Include="EnsureThat\Resources\ExceptionMessages.Designer.cs" />
<Compile Include="Messaging\LimitedConcurrencyLevelTaskScheduler.cs" />
<Compile Include="StringExtensions.cs" />
<Compile Include="EnsureThat\TypeParam.cs" />
<Compile Include="HashUtil.cs" />

@ -0,0 +1,78 @@
using System.Collections.Generic;
using FizzWare.NBuilder;
using FluentAssertions;
using Moq;
using NUnit.Framework;
using NzbDrone.Core.DecisionEngine;
using NzbDrone.Core.IndexerSearch;
using NzbDrone.Core.IndexerSearch.Definitions;
using NzbDrone.Core.Indexers;
using NzbDrone.Core.Indexers.Newznab;
using NzbDrone.Core.Parser.Model;
using NzbDrone.Core.Test.Framework;
using NzbDrone.Core.Tv;
using NzbDrone.Test.Common;
namespace NzbDrone.Core.Test.IndexerSearchTests
{
public class NzbSearchServiceFixture : CoreTest<NzbSearchService>
{
private List<IIndexer> _indexers;
private Series _searchTargetSeries;
[SetUp]
public void Setup()
{
_searchTargetSeries = Builder<Series>.CreateNew().BuildNew();
_indexers = new List<IIndexer>();
_indexers.Add(new Newznab());
_indexers.Add(new Newznab());
_indexers.Add(new Newznab());
_indexers.Add(new Newznab());
_indexers.Add(new Newznab());
_indexers.Add(new Newznab());
_indexers.Add(new Newznab());
_indexers.Add(new Newznab());
_indexers.Add(new Newznab());
_indexers.Add(new Newznab());
_indexers.Add(new Newznab());
_indexers.Add(new Newznab());
_indexers.Add(new Newznab());
_indexers.Add(new Newznab());
_indexers.Add(new Newznab());
Mocker.SetConstant<IEnumerable<IIndexer>>(_indexers);
Mocker.GetMock<ISeriesService>().Setup(c => c.GetSeries(It.IsAny<int>()))
.Returns(_searchTargetSeries);
}
[Test]
public void should_call_fetch_on_all_indexers_at_the_same_time()
{
var counter = new ConcurrencyCounter(_indexers.Count);
Mocker.GetMock<IFetchFeedFromIndexers>().Setup(c => c.Fetch(It.IsAny<IIndexer>(), It.IsAny<SingleEpisodeSearchDefinition>()))
.Returns(new List<ReportInfo>())
.Callback((() => counter.SimulateWork(500)));
Mocker.GetMock<IIndexerService>().Setup(c => c.GetAvailableIndexers()).Returns(_indexers);
Mocker.GetMock<IMakeDownloadDecision>()
.Setup(c => c.GetSearchDecision(It.IsAny<IEnumerable<ReportInfo>>(), It.IsAny<SearchDefinitionBase>()))
.Returns(new List<DownloadDecision>());
Subject.SearchSingle(0, 0, 0);
counter.WaitForAllItems();
counter.MaxThreads.Should().Be(_indexers.Count);
}
}
}

@ -9,6 +9,7 @@ using NzbDrone.Core.Indexers;
using NzbDrone.Core.Indexers.Newznab;
using NzbDrone.Core.Parser.Model;
using NzbDrone.Core.Test.Framework;
using NzbDrone.Test.Common;
namespace NzbDrone.Core.Test.IndexerTests
{
@ -45,26 +46,20 @@ namespace NzbDrone.Core.Test.IndexerTests
[Explicit]
public void should_call_fetch_on_all_indexers_at_the_same_time()
{
var callsToFetch = new List<DateTime>();
var counter = new ConcurrencyCounter(_indexers.Count);
Mocker.GetMock<IFetchFeedFromIndexers>().Setup(c => c.FetchRss(It.IsAny<IIndexer>()))
.Returns(new List<ReportInfo>())
.Callback((() =>
{
Thread.Sleep(2000);
Console.WriteLine(DateTime.Now);
callsToFetch.Add(DateTime.Now);
}));
.Callback((() => counter.SimulateWork(500)));
Mocker.GetMock<IIndexerService>().Setup(c => c.GetAvailableIndexers()).Returns(_indexers);
Subject.Fetch();
counter.WaitForAllItems();
var first = callsToFetch.Min();
var last = callsToFetch.Max();
(last - first).Should().BeLessThan(TimeSpan.FromSeconds(1));
counter.MaxThreads.Should().Be(_indexers.Count);
}
}
}

@ -133,6 +133,7 @@
<Compile Include="Framework\CoreTest.cs" />
<Compile Include="Framework\DbTest.cs" />
<Compile Include="Framework\NBuilderExtensions.cs" />
<Compile Include="IndexerSearchTests\FetchAndParseRssServiceFixture.cs" />
<Compile Include="IndexerSearchTests\SearchDefinitionFixture.cs" />
<Compile Include="IndexerTests\BasicRssParserFixture.cs" />
<Compile Include="IndexerTests\FetchAndParseRssServiceFixture.cs" />

@ -112,21 +112,33 @@ namespace NzbDrone.Core.IndexerSearch
var indexers = _indexerService.GetAvailableIndexers().ToList();
var reports = new List<ReportInfo>();
Parallel.ForEach(indexers, indexer =>
var taskList = new List<Task>();
var taskFactory = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None);
foreach (var indexer in indexers)
{
try
var indexerLocal = indexer;
taskList.Add(taskFactory.StartNew(() =>
{
var indexerReports = searchAction(indexer);
lock (indexer)
try
{
reports.AddRange(indexerReports);
var indexerReports = searchAction(indexerLocal);
lock (reports)
{
reports.AddRange(indexerReports);
}
}
}
catch (Exception e)
{
_logger.ErrorException(String.Format("An error has occurred while searching for {0} from: {1}", definitionBase, indexer.Name), e);
}
});
catch (Exception e)
{
_logger.ErrorException("Error while searching for " + definitionBase, e);
}
}));
}
Task.WaitAll(taskList.ToArray());
_logger.Debug("Total of {0} reports were found for {1} in {2} indexers", reports.Count, definitionBase, indexers.Count);

@ -38,15 +38,28 @@ namespace NzbDrone.Core.Indexers
_logger.Debug("Available indexers {0}", indexers.Count);
Parallel.ForEach(indexers, new ParallelOptions { MaxDegreeOfParallelism = 10 }, indexer =>
{
var indexerFeed = _feedFetcher.FetchRss(indexer);
lock (result)
{
result.AddRange(indexerFeed);
}
});
var taskList = new List<Task>();
var taskFactory = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None);
foreach (var indexer in indexers)
{
var indexerLocal = indexer;
var task = taskFactory.StartNew(() =>
{
var indexerFeed = _feedFetcher.FetchRss(indexerLocal);
lock (result)
{
result.AddRange(indexerFeed);
}
});
taskList.Add(task);
}
Task.WaitAll(taskList.ToArray());
_logger.Debug("Found {0} reports", result.Count);

@ -2,20 +2,16 @@
using System.Collections.Generic;
using System.IO;
using System.Linq;
using Marr.Data;
using NLog;
using NzbDrone.Common;
using NzbDrone.Common.EnsureThat;
using NzbDrone.Common.Messaging;
using NzbDrone.Core.Configuration;
using NzbDrone.Core.DataAugmentation.Scene;
using NzbDrone.Core.Datastore;
using NzbDrone.Core.MetadataSource;
using NzbDrone.Core.Model;
using NzbDrone.Core.Organizer;
using NzbDrone.Core.Parser;
using NzbDrone.Core.RootFolders;
using NzbDrone.Core.SeriesStats;
using NzbDrone.Core.Tv.Events;
namespace NzbDrone.Core.Tv

@ -0,0 +1,57 @@
using System;
using System.Collections.Generic;
using System.Threading;
namespace NzbDrone.Test.Common
{
public class ConcurrencyCounter
{
private int _items;
readonly object _mutex = new object();
readonly Dictionary<int, int> _threads = new Dictionary<int, int>();
public int MaxThreads { get { return _threads.Count; } }
public ConcurrencyCounter(int items)
{
_items = items;
}
public void WaitForAllItems()
{
while (_items != 0)
{
Thread.Sleep(500);
}
}
public int Start()
{
int threadId = Thread.CurrentThread.ManagedThreadId;
lock (_mutex)
{
_threads[threadId] = 1;
}
Console.WriteLine("Starting " + threadId);
return threadId;
}
public void SimulateWork(int sleepInMs)
{
var id = Start();
Thread.Sleep(sleepInMs);
Stop(id);
}
public void Stop(int id)
{
Console.WriteLine("Finished " + id);
lock (_mutex)
{
_items--;
}
}
}
}

@ -90,6 +90,7 @@
<Compile Include="AutoMoq\AutoMoqer.cs" />
<Compile Include="AutoMoq\Unity\AutoMockingBuilderStrategy.cs" />
<Compile Include="AutoMoq\Unity\AutoMockingContainerExtension.cs" />
<Compile Include="ConcurrencyCounter.cs" />
<Compile Include="ExceptionVerification.cs" />
<Compile Include="Categories\IntegrationTestAttribute.cs" />
<Compile Include="LoggingTest.cs" />

Loading…
Cancel
Save