diff --git a/src/NzbDrone.Common.Test/Http/HttpClientFixture.cs b/src/NzbDrone.Common.Test/Http/HttpClientFixture.cs index 79a2dc38b..f6eadb4e5 100644 --- a/src/NzbDrone.Common.Test/Http/HttpClientFixture.cs +++ b/src/NzbDrone.Common.Test/Http/HttpClientFixture.cs @@ -9,6 +9,7 @@ using NzbDrone.Common.Http; using NzbDrone.Test.Common; using NzbDrone.Test.Common.Categories; using NLog; +using NzbDrone.Common.TPL; namespace NzbDrone.Common.Test.Http { @@ -20,6 +21,7 @@ namespace NzbDrone.Common.Test.Http public void SetUp() { Mocker.SetConstant(Mocker.Resolve()); + Mocker.SetConstant(Mocker.Resolve()); } [Test] @@ -140,7 +142,7 @@ namespace NzbDrone.Common.Test.Http var oldRequest = new HttpRequest("http://eu.httpbin.org/get"); oldRequest.AddCookie("my", "cookie"); - var oldClient = new HttpClient(Mocker.Resolve(), Mocker.Resolve()); + var oldClient = new HttpClient(Mocker.Resolve(), Mocker.Resolve(), Mocker.Resolve()); oldClient.Should().NotBeSameAs(Subject); diff --git a/src/NzbDrone.Common.Test/NzbDrone.Common.Test.csproj b/src/NzbDrone.Common.Test/NzbDrone.Common.Test.csproj index 1a40f6334..4ef24b6e2 100644 --- a/src/NzbDrone.Common.Test/NzbDrone.Common.Test.csproj +++ b/src/NzbDrone.Common.Test/NzbDrone.Common.Test.csproj @@ -90,6 +90,7 @@ + diff --git a/src/NzbDrone.Common.Test/TPLTests/RateLimitServiceFixture.cs b/src/NzbDrone.Common.Test/TPLTests/RateLimitServiceFixture.cs new file mode 100644 index 000000000..2bd123997 --- /dev/null +++ b/src/NzbDrone.Common.Test/TPLTests/RateLimitServiceFixture.cs @@ -0,0 +1,94 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Text; +using NUnit.Framework; +using NzbDrone.Common.Cache; +using NzbDrone.Common.TPL; +using NzbDrone.Test.Common; +using FluentAssertions; + +namespace NzbDrone.Common.Test.TPLTests +{ + [TestFixture] + public class RateLimitServiceFixture : TestBase + { + private DateTime _epoch; + + [SetUp] + public void SetUp() + { + // Make sure it's there so we don't affect measurements. + Subject.GetType(); + + _epoch = DateTime.UtcNow; + } + + private ConcurrentDictionary GetRateLimitStore() + { + var cache = Mocker.Resolve() + .GetCache>(typeof(RateLimitService), "rateLimitStore"); + + return cache.Get("rateLimitStore", () => new ConcurrentDictionary()); + } + + private void GivenExisting(string key, DateTime dateTime) + { + GetRateLimitStore().AddOrUpdate(key, dateTime, (s, i) => dateTime); + } + + [Test] + public void should_not_delay_if_unset() + { + var watch = Stopwatch.StartNew(); + Subject.WaitAndPulse("me", TimeSpan.FromMilliseconds(100)); + watch.Stop(); + + watch.ElapsedMilliseconds.Should().BeLessThan(100); + } + + [Test] + public void should_not_delay_unrelated_key() + { + GivenExisting("other", _epoch + TimeSpan.FromMilliseconds(200)); + + var watch = Stopwatch.StartNew(); + Subject.WaitAndPulse("me", TimeSpan.FromMilliseconds(100)); + watch.Stop(); + + watch.ElapsedMilliseconds.Should().BeLessThan(50); + } + + [Test] + public void should_wait_for_existing() + { + GivenExisting("me", _epoch + TimeSpan.FromMilliseconds(200)); + + var watch = Stopwatch.StartNew(); + Subject.WaitAndPulse("me", TimeSpan.FromMilliseconds(400)); + watch.Stop(); + + watch.ElapsedMilliseconds.Should().BeInRange(195, 250); + } + + [Test] + public void should_extend_delay() + { + GivenExisting("me", _epoch + TimeSpan.FromMilliseconds(200)); + + Subject.WaitAndPulse("me", TimeSpan.FromMilliseconds(100)); + + (GetRateLimitStore()["me"] - _epoch).Should().BeGreaterOrEqualTo(TimeSpan.FromMilliseconds(300)); + } + + [Test] + public void should_add_delay() + { + Subject.WaitAndPulse("me", TimeSpan.FromMilliseconds(100)); + + (GetRateLimitStore()["me"] - _epoch).Should().BeGreaterOrEqualTo(TimeSpan.FromMilliseconds(100)); + } + } +} diff --git a/src/NzbDrone.Common/Http/HttpClient.cs b/src/NzbDrone.Common/Http/HttpClient.cs index 56c495c0c..e68895813 100644 --- a/src/NzbDrone.Common/Http/HttpClient.cs +++ b/src/NzbDrone.Common/Http/HttpClient.cs @@ -6,6 +6,7 @@ using NLog; using NzbDrone.Common.Cache; using NzbDrone.Common.EnvironmentInfo; using NzbDrone.Common.Extensions; +using NzbDrone.Common.TPL; namespace NzbDrone.Common.Http { @@ -23,12 +24,13 @@ namespace NzbDrone.Common.Http public class HttpClient : IHttpClient { private readonly Logger _logger; - + private readonly IRateLimitService _rateLimitService; private readonly ICached _cookieContainerCache; - public HttpClient(ICacheManager cacheManager, Logger logger) + public HttpClient(ICacheManager cacheManager, IRateLimitService rateLimitService, Logger logger) { _logger = logger; + _rateLimitService = rateLimitService; ServicePointManager.DefaultConnectionLimit = 12; _cookieContainerCache = cacheManager.GetCache(typeof(HttpClient)); @@ -36,6 +38,11 @@ namespace NzbDrone.Common.Http public HttpResponse Execute(HttpRequest request) { + if (request.RateLimit != TimeSpan.Zero) + { + _rateLimitService.WaitAndPulse(request.Url.Host, request.RateLimit); + } + _logger.Trace(request); var webRequest = (HttpWebRequest)WebRequest.Create(request.Url); diff --git a/src/NzbDrone.Common/Http/HttpRequest.cs b/src/NzbDrone.Common/Http/HttpRequest.cs index 3de434fbe..890099a91 100644 --- a/src/NzbDrone.Common/Http/HttpRequest.cs +++ b/src/NzbDrone.Common/Http/HttpRequest.cs @@ -47,6 +47,7 @@ namespace NzbDrone.Common.Http public bool AllowAutoRedirect { get; set; } public Dictionary Cookies { get; private set; } public bool StoreResponseCookie { get; set; } + public TimeSpan RateLimit { get; set; } public override string ToString() { diff --git a/src/NzbDrone.Common/NzbDrone.Common.csproj b/src/NzbDrone.Common/NzbDrone.Common.csproj index 6a2a77deb..2dc9bb8e0 100644 --- a/src/NzbDrone.Common/NzbDrone.Common.csproj +++ b/src/NzbDrone.Common/NzbDrone.Common.csproj @@ -178,7 +178,6 @@ - @@ -190,6 +189,7 @@ + diff --git a/src/NzbDrone.Common/RateGate.cs b/src/NzbDrone.Common/RateGate.cs deleted file mode 100644 index 598bf30e1..000000000 --- a/src/NzbDrone.Common/RateGate.cs +++ /dev/null @@ -1,197 +0,0 @@ -/* - * Code from: http://www.jackleitch.net/2010/10/better-rate-limiting-with-dot-net/ - */ - -using System; -using System.Collections.Concurrent; -using System.Threading; - -namespace NzbDrone.Common -{ - /// - /// Used to control the rate of some occurrence per unit of time. - /// - /// - /// - /// To control the rate of an action using a , - /// code should simply call prior to - /// performing the action. will block - /// the current thread until the action is allowed based on the rate - /// limit. - /// - /// - /// This class is thread safe. A single instance - /// may be used to control the rate of an occurrence across multiple - /// threads. - /// - /// - public class RateGate : IDisposable - { - // Semaphore used to count and limit the number of occurrences per - // unit time. - private readonly SemaphoreSlim _semaphore; - - // Times (in millisecond ticks) at which the semaphore should be exited. - private readonly ConcurrentQueue _exitTimes; - - // Timer used to trigger exiting the semaphore. - private readonly Timer _exitTimer; - - // Whether this instance is disposed. - private bool _isDisposed; - - /// - /// Number of occurrences allowed per unit of time. - /// - public int Occurrences { get; private set; } - - /// - /// The length of the time unit, in milliseconds. - /// - public int TimeUnitMilliseconds { get; private set; } - - /// - /// Initializes a with a rate of - /// per . - /// - /// Number of occurrences allowed per unit of time. - /// Length of the time unit. - /// - /// If or is negative. - /// - public RateGate(int occurrences, TimeSpan timeUnit) - { - // Check the arguments. - if (occurrences <= 0) - throw new ArgumentOutOfRangeException("occurrences", "Number of occurrences must be a positive integer"); - if (timeUnit != timeUnit.Duration()) - throw new ArgumentOutOfRangeException("timeUnit", "Time unit must be a positive span of time"); - if (timeUnit >= TimeSpan.FromMilliseconds(UInt32.MaxValue)) - throw new ArgumentOutOfRangeException("timeUnit", "Time unit must be less than 2^32 milliseconds"); - - Occurrences = occurrences; - TimeUnitMilliseconds = (int)timeUnit.TotalMilliseconds; - - // Create the semaphore, with the number of occurrences as the maximum count. - _semaphore = new SemaphoreSlim(Occurrences, Occurrences); - - // Create a queue to hold the semaphore exit times. - _exitTimes = new ConcurrentQueue(); - - // Create a timer to exit the semaphore. Use the time unit as the original - // interval length because that's the earliest we will need to exit the semaphore. - _exitTimer = new Timer(ExitTimerCallback, null, TimeUnitMilliseconds, -1); - } - - // Callback for the exit timer that exits the semaphore based on exit times - // in the queue and then sets the timer for the nextexit time. - private void ExitTimerCallback(object state) - { - // While there are exit times that are passed due still in the queue, - // exit the semaphore and dequeue the exit time. - int exitTime; - while (_exitTimes.TryPeek(out exitTime) - && unchecked(exitTime - Environment.TickCount) <= 0) - { - _semaphore.Release(); - _exitTimes.TryDequeue(out exitTime); - } - - // Try to get the next exit time from the queue and compute - // the time until the next check should take place. If the - // queue is empty, then no exit times will occur until at least - // one time unit has passed. - int timeUntilNextCheck; - if (_exitTimes.TryPeek(out exitTime)) - timeUntilNextCheck = unchecked(exitTime - Environment.TickCount); - else - timeUntilNextCheck = TimeUnitMilliseconds; - - // Set the timer. - _exitTimer.Change(timeUntilNextCheck, -1); - } - - /// - /// Blocks the current thread until allowed to proceed or until the - /// specified timeout elapses. - /// - /// Number of milliseconds to wait, or -1 to wait indefinitely. - /// true if the thread is allowed to proceed, or false if timed out - public bool WaitToProceed(int millisecondsTimeout) - { - // Check the arguments. - if (millisecondsTimeout < -1) - throw new ArgumentOutOfRangeException("millisecondsTimeout"); - - CheckDisposed(); - - // Block until we can enter the semaphore or until the timeout expires. - var entered = _semaphore.Wait(millisecondsTimeout); - - // If we entered the semaphore, compute the corresponding exit time - // and add it to the queue. - if (entered) - { - var timeToExit = unchecked(Environment.TickCount + TimeUnitMilliseconds); - _exitTimes.Enqueue(timeToExit); - } - - return entered; - } - - /// - /// Blocks the current thread until allowed to proceed or until the - /// specified timeout elapses. - /// - /// - /// true if the thread is allowed to proceed, or false if timed out - public bool WaitToProceed(TimeSpan timeout) - { - return WaitToProceed((int)timeout.TotalMilliseconds); - } - - /// - /// Blocks the current thread indefinitely until allowed to proceed. - /// - public void WaitToProceed() - { - WaitToProceed(Timeout.Infinite); - } - - // Throws an ObjectDisposedException if this object is disposed. - private void CheckDisposed() - { - if (_isDisposed) - throw new ObjectDisposedException("RateGate is already disposed"); - } - - /// - /// Releases unmanaged resources held by an instance of this class. - /// - public void Dispose() - { - Dispose(true); - GC.SuppressFinalize(this); - } - - /// - /// Releases unmanaged resources held by an instance of this class. - /// - /// Whether this object is being disposed. - protected virtual void Dispose(bool isDisposing) - { - if (!_isDisposed) - { - if (isDisposing) - { - // The semaphore and timer both implement IDisposable and - // therefore must be disposed. - _semaphore.Dispose(); - _exitTimer.Dispose(); - - _isDisposed = true; - } - } - } - } -} diff --git a/src/NzbDrone.Common/TPL/RateLimitService.cs b/src/NzbDrone.Common/TPL/RateLimitService.cs new file mode 100644 index 000000000..186fa6e80 --- /dev/null +++ b/src/NzbDrone.Common/TPL/RateLimitService.cs @@ -0,0 +1,43 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using NLog; +using NzbDrone.Common.Cache; + +namespace NzbDrone.Common.TPL +{ + public interface IRateLimitService + { + void WaitAndPulse(string key, TimeSpan interval); + } + + public class RateLimitService : IRateLimitService + { + private readonly ConcurrentDictionary _rateLimitStore; + private readonly Logger _logger; + + public RateLimitService(ICacheManager cacheManager, Logger logger) + { + _rateLimitStore = cacheManager.GetCache>(GetType(), "rateLimitStore").Get("rateLimitStore", () => new ConcurrentDictionary()); + _logger = logger; + } + + public void WaitAndPulse(string key, TimeSpan interval) + { + var waitUntil = _rateLimitStore.AddOrUpdate(key, + (s) => DateTime.UtcNow + interval, + (s,i) => new DateTime(Math.Max(DateTime.UtcNow.Ticks, i.Ticks), DateTimeKind.Utc) + interval); + + waitUntil -= interval; + + var delay = waitUntil - DateTime.UtcNow; + + if (delay.TotalSeconds > 0.0) + { + _logger.Trace("Rate Limit triggered, delaying '{0}' for {1:0.000} sec", key, delay.TotalSeconds); + System.Threading.Thread.Sleep(delay); + } + } + } +} diff --git a/src/NzbDrone.Core.Test/Download/DownloadServiceFixture.cs b/src/NzbDrone.Core.Test/Download/DownloadServiceFixture.cs index 8a7e13fe0..a45840005 100644 --- a/src/NzbDrone.Core.Test/Download/DownloadServiceFixture.cs +++ b/src/NzbDrone.Core.Test/Download/DownloadServiceFixture.cs @@ -38,6 +38,7 @@ namespace NzbDrone.Core.Test.Download var releaseInfo = Builder.CreateNew() .With(v => v.DownloadProtocol = Indexers.DownloadProtocol.Usenet) + .With(v => v.DownloadUrl = "http://test.site/download1.ext") .Build(); _parseResult = Builder.CreateNew() diff --git a/src/NzbDrone.Core.Test/Framework/CoreTest.cs b/src/NzbDrone.Core.Test/Framework/CoreTest.cs index bf3d2bd7a..8c588531a 100644 --- a/src/NzbDrone.Core.Test/Framework/CoreTest.cs +++ b/src/NzbDrone.Core.Test/Framework/CoreTest.cs @@ -3,6 +3,7 @@ using NUnit.Framework; using NzbDrone.Common.Cache; using NzbDrone.Common.Cloud; using NzbDrone.Common.Http; +using NzbDrone.Common.TPL; using NzbDrone.Test.Common; namespace NzbDrone.Core.Test.Framework @@ -17,7 +18,7 @@ namespace NzbDrone.Core.Test.Framework protected void UseRealHttp() { Mocker.SetConstant(new HttpProvider(TestLogger)); - Mocker.SetConstant(new HttpClient(Mocker.Resolve(), TestLogger)); + Mocker.SetConstant(new HttpClient(Mocker.Resolve(), Mocker.Resolve(), TestLogger)); Mocker.SetConstant(new DroneServicesHttpRequestBuilder()); } } diff --git a/src/NzbDrone.Core/Download/DownloadService.cs b/src/NzbDrone.Core/Download/DownloadService.cs index 6f0737dc0..d3625537f 100644 --- a/src/NzbDrone.Core/Download/DownloadService.cs +++ b/src/NzbDrone.Core/Download/DownloadService.cs @@ -1,7 +1,9 @@ using System; using NLog; using NzbDrone.Common.EnsureThat; +using NzbDrone.Common.Extensions; using NzbDrone.Common.Instrumentation.Extensions; +using NzbDrone.Common.TPL; using NzbDrone.Core.Messaging.Events; using NzbDrone.Core.Parser.Model; @@ -16,13 +18,17 @@ namespace NzbDrone.Core.Download public class DownloadService : IDownloadService { private readonly IProvideDownloadClient _downloadClientProvider; + private readonly IRateLimitService _rateLimitService; private readonly IEventAggregator _eventAggregator; private readonly Logger _logger; public DownloadService(IProvideDownloadClient downloadClientProvider, - IEventAggregator eventAggregator, Logger logger) + IRateLimitService rateLimitService, + IEventAggregator eventAggregator, + Logger logger) { _downloadClientProvider = downloadClientProvider; + _rateLimitService = rateLimitService; _eventAggregator = eventAggregator; _logger = logger; } @@ -41,6 +47,13 @@ namespace NzbDrone.Core.Download return; } + // Limit grabs to 2 per second. + if (remoteEpisode.Release.DownloadUrl.IsNotNullOrWhiteSpace() && !remoteEpisode.Release.DownloadUrl.StartsWith("magnet:")) + { + var uri = new Uri(remoteEpisode.Release.DownloadUrl); + _rateLimitService.WaitAndPulse(uri.Host, TimeSpan.FromSeconds(2)); + } + var downloadClientId = downloadClient.Download(remoteEpisode); var episodeGrabbedEvent = new EpisodeGrabbedEvent(remoteEpisode); episodeGrabbedEvent.DownloadClient = downloadClient.GetType().Name; diff --git a/src/NzbDrone.Core/Download/ProcessDownloadDecisions.cs b/src/NzbDrone.Core/Download/ProcessDownloadDecisions.cs index b170dcec5..26635f944 100644 --- a/src/NzbDrone.Core/Download/ProcessDownloadDecisions.cs +++ b/src/NzbDrone.Core/Download/ProcessDownloadDecisions.cs @@ -39,55 +39,50 @@ namespace NzbDrone.Core.Download var grabbed = new List(); var pending = new List(); - //Limits to 1 grab every 1 second to reduce rapid API hits - using (var rateGate = new RateGate(1, TimeSpan.FromSeconds(1))) + foreach (var report in prioritizedDecisions) { - foreach (var report in prioritizedDecisions) - { - var remoteEpisode = report.RemoteEpisode; + var remoteEpisode = report.RemoteEpisode; - var episodeIds = remoteEpisode.Episodes.Select(e => e.Id).ToList(); + var episodeIds = remoteEpisode.Episodes.Select(e => e.Id).ToList(); - //Skip if already grabbed - if (grabbed.SelectMany(r => r.RemoteEpisode.Episodes) - .Select(e => e.Id) - .ToList() - .Intersect(episodeIds) - .Any()) - { - continue; - } + //Skip if already grabbed + if (grabbed.SelectMany(r => r.RemoteEpisode.Episodes) + .Select(e => e.Id) + .ToList() + .Intersect(episodeIds) + .Any()) + { + continue; + } - if (report.TemporarilyRejected) - { - _pendingReleaseService.Add(report); - pending.Add(report); - continue; - } + if (report.TemporarilyRejected) + { + _pendingReleaseService.Add(report); + pending.Add(report); + continue; + } - if (pending.SelectMany(r => r.RemoteEpisode.Episodes) - .Select(e => e.Id) - .ToList() - .Intersect(episodeIds) - .Any()) - { - continue; - } + if (pending.SelectMany(r => r.RemoteEpisode.Episodes) + .Select(e => e.Id) + .ToList() + .Intersect(episodeIds) + .Any()) + { + continue; + } - try - { - rateGate.WaitToProceed(); - _downloadService.DownloadReport(remoteEpisode); - grabbed.Add(report); - } - catch (Exception e) - { - //TODO: support for store & forward - //We'll need to differentiate between a download client error and an indexer error - _logger.WarnException("Couldn't add report to download queue. " + remoteEpisode, e); - } + try + { + _downloadService.DownloadReport(remoteEpisode); + grabbed.Add(report); + } + catch (Exception e) + { + //TODO: support for store & forward + //We'll need to differentiate between a download client error and an indexer error + _logger.WarnException("Couldn't add report to download queue. " + remoteEpisode, e); } - } + } return new ProcessedDecisions(grabbed, pending, decisions.Where(d => d.Rejected).ToList()); } diff --git a/src/NzbDrone.Core/Indexers/HttpIndexerBase.cs b/src/NzbDrone.Core/Indexers/HttpIndexerBase.cs index 6906cb9af..f8bb282b9 100644 --- a/src/NzbDrone.Core/Indexers/HttpIndexerBase.cs +++ b/src/NzbDrone.Core/Indexers/HttpIndexerBase.cs @@ -6,6 +6,7 @@ using FluentValidation.Results; using NLog; using NzbDrone.Common.Extensions; using NzbDrone.Common.Http; +using NzbDrone.Common.TPL; using NzbDrone.Core.Configuration; using NzbDrone.Core.Indexers.Exceptions; using NzbDrone.Core.IndexerSearch.Definitions; @@ -27,6 +28,7 @@ namespace NzbDrone.Core.Indexers public bool SupportsPaging { get { return PageSize > 0; } } public virtual Int32 PageSize { get { return 0; } } + public virtual TimeSpan RateLimit { get { return TimeSpan.FromSeconds(2); } } public abstract IIndexerRequestGenerator GetRequestGenerator(); public abstract IParseIndexerResponse GetParser(); @@ -199,6 +201,11 @@ namespace NzbDrone.Core.Indexers { _logger.Debug("Downloading Feed " + request.Url); + if (request.HttpRequest.RateLimit < RateLimit) + { + request.HttpRequest.RateLimit = RateLimit; + } + return new IndexerResponse(request, _httpClient.Execute(request.HttpRequest)); } diff --git a/src/NzbDrone.Core/NzbDrone.Core.csproj b/src/NzbDrone.Core/NzbDrone.Core.csproj index 3b754935a..a683120c3 100644 --- a/src/NzbDrone.Core/NzbDrone.Core.csproj +++ b/src/NzbDrone.Core/NzbDrone.Core.csproj @@ -1016,4 +1016,4 @@ --> - + \ No newline at end of file