Sonarr pulls (#310)

* New: Speed up sqlite3 initialization by disabling unused features

Co-Authored-By: taloth <taloth@users.noreply.github.com>

* New: Debounce Command Notifications

Co-Authored-By: taloth <taloth@users.noreply.github.com>

* Changed: Refactored PendingRelease logic for performance

Co-Authored-By: taloth <taloth@users.noreply.github.com>

* Added: Indexes to speed up DecisionMaker performance.

Co-Authored-By: taloth <taloth@users.noreply.github.com>

* New: Cache EventAggregator Subscribers

Co-Authored-By: taloth <taloth@users.noreply.github.com>

* Fixed: Hide fallback pending releases if temporarily delayed

Co-Authored-By: taloth <taloth@users.noreply.github.com>
pull/6/head
Qstick 7 years ago committed by Joseph Milazzo
parent e3f5ce771c
commit e06858e4bf

@ -1,7 +1,8 @@
using System;
using System;
using System.Collections.Generic;
using System.Linq;
using NzbDrone.Common;
using NzbDrone.Common.TPL;
using NzbDrone.Core.Datastore.Events;
using NzbDrone.Core.Messaging.Commands;
using NzbDrone.Core.Messaging.Events;
@ -17,6 +18,8 @@ namespace Lidarr.Api.V1.Commands
{
private readonly IManageCommandQueue _commandQueueManager;
private readonly IServiceFactory _serviceFactory;
private readonly Debouncer _debouncer;
private readonly Dictionary<int, CommandResource> _pendingUpdates;
public CommandModule(IManageCommandQueue commandQueueManager,
IBroadcastSignalRMessage signalRBroadcaster,
@ -31,6 +34,9 @@ namespace Lidarr.Api.V1.Commands
GetResourceAll = GetStartedCommands;
PostValidator.RuleFor(c => c.Name).NotBlank();
_debouncer = new Debouncer(SendUpdates, TimeSpan.FromSeconds(0.1));
_pendingUpdates = new Dictionary<int, CommandResource>();
}
private CommandResource GetCommand(int id)
@ -63,8 +69,26 @@ namespace Lidarr.Api.V1.Commands
{
if (message.Command.Body.SendUpdatesToClient)
{
BroadcastResourceChange(ModelAction.Updated, message.Command.ToResource());
lock (_pendingUpdates)
{
_pendingUpdates[message.Command.Id] = message.Command.ToResource();
}
_debouncer.Execute();
}
}
private void SendUpdates()
{
lock (_pendingUpdates)
{
var pendingUpdates = _pendingUpdates.Values.ToArray();
_pendingUpdates.Clear();
foreach (var pendingUpdate in pendingUpdates)
{
BroadcastResourceChange(ModelAction.Updated, pendingUpdate);
}
}
}
}
}
}

@ -51,6 +51,34 @@ namespace NzbDrone.Common.Extensions
}
}
public static Dictionary<TKey, TItem> ToDictionaryIgnoreDuplicates<TItem, TKey>(this IEnumerable<TItem> src, Func<TItem, TKey> keySelector)
{
var result = new Dictionary<TKey, TItem>();
foreach (var item in src)
{
var key = keySelector(item);
if (!result.ContainsKey(key))
{
result[key] = item;
}
}
return result;
}
public static Dictionary<TKey, TValue> ToDictionaryIgnoreDuplicates<TItem, TKey, TValue>(this IEnumerable<TItem> src, Func<TItem, TKey> keySelector, Func<TItem, TValue> valueSelector)
{
var result = new Dictionary<TKey, TValue>();
foreach (var item in src)
{
var key = keySelector(item);
if (!result.ContainsKey(key))
{
result[key] = valueSelector(item);
}
}
return result;
}
public static void AddIfNotNull<TSource>(this List<TSource> source, TSource item)
{
if (item == null)
@ -81,4 +109,4 @@ namespace NzbDrone.Common.Extensions
return source.Select(predicate).ToList();
}
}
}
}

@ -211,7 +211,7 @@ namespace NzbDrone.Core.Test.Download.DownloadApprovedReportsTests
decisions.Add(new DownloadDecision(remoteAlbum, new Rejection("Failure!", RejectionType.Temporary)));
Subject.ProcessDecisions(decisions);
Mocker.GetMock<IPendingReleaseService>().Verify(v => v.Add(It.IsAny<DownloadDecision>(), It.IsAny<PendingReleaseReason>()), Times.Never());
Mocker.GetMock<IPendingReleaseService>().Verify(v => v.AddMany(It.IsAny<List<Tuple<DownloadDecision, PendingReleaseReason>>>()), Times.Never());
}
[Test]
@ -225,7 +225,7 @@ namespace NzbDrone.Core.Test.Download.DownloadApprovedReportsTests
decisions.Add(new DownloadDecision(remoteAlbum, new Rejection("Failure!", RejectionType.Temporary)));
Subject.ProcessDecisions(decisions);
Mocker.GetMock<IPendingReleaseService>().Verify(v => v.Add(It.IsAny<DownloadDecision>(), It.IsAny<PendingReleaseReason>()), Times.Exactly(2));
Mocker.GetMock<IPendingReleaseService>().Verify(v => v.AddMany(It.IsAny<List<Tuple<DownloadDecision, PendingReleaseReason>>>()), Times.Once());
}
[Test]

@ -28,11 +28,21 @@ namespace NzbDrone.Core.Datastore
static DbFactory()
{
InitializeEnvironment();
MapRepository.Instance.ReflectionStrategy = new SimpleReflectionStrategy();
TableMapping.Map();
}
public static void RegisterDatabase(IContainer container)
private static void InitializeEnvironment()
{
// Speed up sqlite3 initialization since we don't use the config file and can't rely on preloading.
Environment.SetEnvironmentVariable("No_Expand", "true");
Environment.SetEnvironmentVariable("No_SQLiteXmlConfigFile", "true");
Environment.SetEnvironmentVariable("No_PreLoadSQLite", "true");
}
public static void RegisterDatabase(IContainer container)
{
var mainDb = new MainDatabase(container.Resolve<IDbFactory>().Create());

@ -0,0 +1,24 @@
using FluentMigrator;
using NzbDrone.Core.Datastore.Migration.Framework;
namespace NzbDrone.Core.Datastore.Migration
{
[Migration(16)]
public class update_artist_history_indexes : NzbDroneMigrationBase
{
protected override void MainDbUpgrade()
{
Create.Index().OnTable("Albums").OnColumn("ArtistId");
Create.Index().OnTable("Albums").OnColumn("ArtistId").Ascending()
.OnColumn("ReleaseDate").Ascending();
Delete.Index().OnTable("History").OnColumn("AlbumId");
Create.Index().OnTable("History").OnColumn("AlbumId").Ascending()
.OnColumn("Date").Descending();
Delete.Index().OnTable("History").OnColumn("DownloadId");
Create.Index().OnTable("History").OnColumn("DownloadId").Ascending()
.OnColumn("Date").Descending();
}
}
}

@ -15,6 +15,7 @@ using NzbDrone.Core.Music.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using Marr.Data;
namespace NzbDrone.Core.Download.Pending
{
@ -69,66 +70,72 @@ namespace NzbDrone.Core.Download.Pending
public void Add(DownloadDecision decision, PendingReleaseReason reason)
{
var alreadyPending = _repository.AllByArtistId(decision.RemoteAlbum.Artist.Id);
alreadyPending = IncludeRemoteAlbums(alreadyPending);
Add(alreadyPending, decision, reason);
AddMany(new List<Tuple<DownloadDecision, PendingReleaseReason>> { Tuple.Create(decision, reason) });
}
public void AddMany(List<Tuple<DownloadDecision, PendingReleaseReason>> decisions)
{
var alreadyPending = decisions.Select(v => v.Item1.RemoteAlbum.Artist.Id).Distinct().SelectMany(_repository.AllByArtistId).ToList();
alreadyPending = IncludeRemoteAlbums(alreadyPending);
foreach (var pair in decisions)
foreach (var artistDecisions in decisions.GroupBy(v => v.Item1.RemoteAlbum.Artist.Id))
{
Add(alreadyPending, pair.Item1, pair.Item2);
}
}
var artist = artistDecisions.First().Item1.RemoteAlbum.Artist;
var alreadyPending = _repository.AllByArtistId(artist.Id);
private void Add(List<PendingRelease> alreadyPending, DownloadDecision decision, PendingReleaseReason reason)
{
var albumIds = decision.RemoteAlbum.Albums.Select(e => e.Id);
var existingReports = alreadyPending.Where(r => r.RemoteAlbum.Albums.Select(e => e.Id)
.Intersect(albumIds)
.Any());
alreadyPending = IncludeRemoteAlbums(alreadyPending, artistDecisions.ToDictionaryIgnoreDuplicates(v => v.Item1.RemoteAlbum.Release.Title, v => v.Item1.RemoteAlbum));
var alreadyPendingByAlbum = CreateAlbumLookup(alreadyPending);
var matchingReports = existingReports.Where(MatchingReleasePredicate(decision.RemoteAlbum.Release)).ToList();
if (matchingReports.Any())
{
var matchingReport = matchingReports.First();
if (matchingReport.Reason != reason)
foreach (var pair in artistDecisions)
{
_logger.Debug("The release {0} is already pending with reason {1}, changing to {2}", decision.RemoteAlbum, matchingReport.Reason, reason);
matchingReport.Reason = reason;
_repository.Update(matchingReport);
}
var decision = pair.Item1;
var reason = pair.Item2;
else
{
_logger.Debug("The release {0} is already pending with reason {1}, not adding again", decision.RemoteAlbum, reason); return;
}
var albumIds = decision.RemoteAlbum.Albums.Select(e => e.Id);
if (matchingReports.Count() > 1)
{
_logger.Debug("The release {0} had {1} duplicate pending, removing duplicates.", decision.RemoteAlbum, matchingReports.Count() - 1);
var existingReports = albumIds.SelectMany(v => alreadyPendingByAlbum[v] ?? Enumerable.Empty<PendingRelease>())
.Distinct().ToList();
var matchingReports = existingReports.Where(MatchingReleasePredicate(decision.RemoteAlbum.Release)).ToList();
foreach (var duplicate in matchingReports.Skip(1))
if (matchingReports.Any())
{
_repository.Delete(duplicate.Id);
alreadyPending.Remove(duplicate);
var matchingReport = matchingReports.First();
if (matchingReport.Reason != reason)
{
_logger.Debug("The release {0} is already pending with reason {1}, changing to {2}", decision.RemoteAlbum, matchingReport.Reason, reason);
matchingReport.Reason = reason;
_repository.Update(matchingReport);
}
else
{
_logger.Debug("The release {0} is already pending with reason {1}, not adding again", decision.RemoteAlbum, reason);
}
if (matchingReports.Count() > 1)
{
_logger.Debug("The release {0} had {1} duplicate pending, removing duplicates.", decision.RemoteAlbum, matchingReports.Count() - 1);
foreach (var duplicate in matchingReports.Skip(1))
{
_repository.Delete(duplicate.Id);
alreadyPending.Remove(duplicate);
alreadyPendingByAlbum = CreateAlbumLookup(alreadyPending);
}
}
continue;
}
}
return;
_logger.Debug("Adding release {0} to pending releases with reason {1}", decision.RemoteAlbum, reason);
Insert(decision, reason);
}
}
}
_logger.Debug("Adding release {0} to pending releases with reason {1}", decision.RemoteAlbum, reason); Insert(decision, reason);
private ILookup<int, PendingRelease> CreateAlbumLookup(IEnumerable<PendingRelease> alreadyPending)
{
return alreadyPending.SelectMany(v => v.RemoteAlbum.Albums
.Select(d => new { Album = d, PendingRelease = v }))
.ToLookup(v => v.Album.Id, v => v.PendingRelease);
}
public List<ReleaseInfo> GetPending()
@ -185,20 +192,20 @@ namespace NzbDrone.Core.Download.Pending
}
var queue = new Queue.Queue
{
Id = GetQueueId(pendingRelease, album),
Artist = pendingRelease.RemoteAlbum.Artist,
Album = album,
Quality = pendingRelease.RemoteAlbum.ParsedAlbumInfo.Quality,
Title = pendingRelease.Title,
Size = pendingRelease.RemoteAlbum.Release.Size,
Sizeleft = pendingRelease.RemoteAlbum.Release.Size,
RemoteAlbum = pendingRelease.RemoteAlbum,
Timeleft = timeleft,
EstimatedCompletionTime = ect,
Status = pendingRelease.Reason.ToString(),
Protocol = pendingRelease.RemoteAlbum.Release.DownloadProtocol,
Indexer = pendingRelease.RemoteAlbum.Release.Indexer
{
Id = GetQueueId(pendingRelease, album),
Artist = pendingRelease.RemoteAlbum.Artist,
Album = album,
Quality = pendingRelease.RemoteAlbum.ParsedAlbumInfo.Quality,
Title = pendingRelease.Title,
Size = pendingRelease.RemoteAlbum.Release.Size,
Sizeleft = pendingRelease.RemoteAlbum.Release.Size,
RemoteAlbum = pendingRelease.RemoteAlbum,
Timeleft = timeleft,
EstimatedCompletionTime = ect,
Status = pendingRelease.Reason.ToString(),
Protocol = pendingRelease.RemoteAlbum.Release.DownloadProtocol,
Indexer = pendingRelease.RemoteAlbum.Release.Indexer
};
queued.Add(queue);
@ -254,11 +261,27 @@ namespace NzbDrone.Core.Download.Pending
return IncludeRemoteAlbums(_repository.AllByArtistId(artistId).ToList());
}
private List<PendingRelease> IncludeRemoteAlbums(List<PendingRelease> releases)
private List<PendingRelease> IncludeRemoteAlbums(List<PendingRelease> releases, Dictionary<string, RemoteAlbum> knownRemoteAlbums = null)
{
var result = new List<PendingRelease>();
var artistMap = _artistService.GetArtists(releases.Select(v => v.ArtistId).Distinct())
.ToDictionary(v => v.Id);
var artistMap = new Dictionary<int, Artist>();
if (knownRemoteAlbums != null)
{
foreach (var artist in knownRemoteAlbums.Values.Select(v => v.Artist))
{
if (!artistMap.ContainsKey(artist.Id))
{
artistMap[artist.Id] = artist;
}
}
}
foreach (var artist in _artistService.GetArtists(releases.Select(v => v.ArtistId).Distinct().Where(v => !artistMap.ContainsKey(v))))
{
artistMap[artist.Id] = artist;
}
foreach (var release in releases)
{
@ -267,7 +290,17 @@ namespace NzbDrone.Core.Download.Pending
// Just in case the artist was removed, but wasn't cleaned up yet (housekeeper will clean it up)
if (artist == null) return null;
var albums = _parsingService.GetAlbums(release.ParsedAlbumInfo, artist);
List<Album> albums;
RemoteAlbum knownRemoteAlbum;
if (knownRemoteAlbums != null && knownRemoteAlbums.TryGetValue(release.Release.Title, out knownRemoteAlbum))
{
albums = knownRemoteAlbum.Albums;
}
else
{
albums = _parsingService.GetAlbums(release.ParsedAlbumInfo, artist);
}
release.RemoteAlbum = new RemoteAlbum
{

@ -40,9 +40,11 @@ namespace NzbDrone.Core.Download
var prioritizedDecisions = _prioritizeDownloadDecision.PrioritizeDecisions(qualifiedReports);
var grabbed = new List<DownloadDecision>();
var pending = new List<DownloadDecision>();
var failed = new List<DownloadDecision>();
//var failed = new List<DownloadDecision>();
var rejected = decisions.Where(d => d.Rejected).ToList();
var pendingAddQueue = new List<Tuple<DownloadDecision, PendingReleaseReason>>();
var usenetFailed = false;
var torrentFailed = false;
@ -59,15 +61,14 @@ namespace NzbDrone.Core.Download
if (report.TemporarilyRejected)
{
_pendingReleaseService.Add(report, PendingReleaseReason.Delay);
pending.Add(report);
PreparePending(pendingAddQueue, grabbed, pending, report, PendingReleaseReason.Delay);
continue;
}
if (downloadProtocol == DownloadProtocol.Usenet && usenetFailed ||
downloadProtocol == DownloadProtocol.Torrent && torrentFailed)
{
failed.Add(report);
PreparePending(pendingAddQueue, grabbed, pending, report, PendingReleaseReason.DownloadClientUnavailable);
continue;
}
@ -86,7 +87,7 @@ namespace NzbDrone.Core.Download
if (ex is DownloadClientUnavailableException || ex is DownloadClientAuthenticationException)
{
_logger.Debug(ex, "Failed to send release to download client, storing until later. " + remoteAlbum);
failed.Add(report);
PreparePending(pendingAddQueue, grabbed, pending, report, PendingReleaseReason.DownloadClientUnavailable);
if (downloadProtocol == DownloadProtocol.Usenet)
{
@ -104,7 +105,10 @@ namespace NzbDrone.Core.Download
}
}
pending.AddRange(ProcessFailedGrabs(grabbed, failed));
if (pendingAddQueue.Any())
{
_pendingReleaseService.AddMany(pendingAddQueue);
}
return new ProcessedDecisions(grabbed, pending, rejected);
}
@ -126,45 +130,22 @@ namespace NzbDrone.Core.Download
.Any();
}
private List<DownloadDecision> ProcessFailedGrabs(List<DownloadDecision> grabbed, List<DownloadDecision> failed)
private void PreparePending(List<Tuple<DownloadDecision, PendingReleaseReason>> queue, List<DownloadDecision> grabbed, List<DownloadDecision> pending, DownloadDecision report, PendingReleaseReason reason)
{
var pending = new List<DownloadDecision>();
var stored = new List<DownloadDecision>();
var addQueue = new List<Tuple<DownloadDecision, PendingReleaseReason>>();
foreach (var report in failed)
{
// If a release was already grabbed with matching albums we should store it as a fallback
// and filter it out the next time it is processed incase a higher quality release failed to
// add to the download client, but a lower quality release was sent to another client
// If the release wasn't grabbed already, but was already stored, store it as a fallback,
// otherwise store it as DownloadClientUnavailable.
if (IsAlbumProcessed(grabbed, report))
{
addQueue.Add(Tuple.Create(report, PendingReleaseReason.Fallback));
pending.Add(report);
}
else if (IsAlbumProcessed(stored, report))
{
addQueue.Add(Tuple.Create(report, PendingReleaseReason.Fallback));
pending.Add(report);
}
else
{
addQueue.Add(Tuple.Create(report, PendingReleaseReason.DownloadClientUnavailable));
pending.Add(report);
stored.Add(report);
}
}
if (addQueue.Any())
// If a release was already grabbed with matching albums we should store it as a fallback
// and filter it out the next time it is processed.
// If a higher quality release failed to add to the download client, but a lower quality release
// was sent to another client we still list it normally so it apparent that it'll grab next time.
// Delayed is treated the same, but only the first is listed the subsequent items as stored as Fallback.
if (IsAlbumProcessed(grabbed, report) ||
IsAlbumProcessed(pending, report))
{
_pendingReleaseService.AddMany(addQueue);
reason = PendingReleaseReason.Fallback;
}
return pending;
queue.Add(Tuple.Create(report, reason));
pending.Add(report);
}
}
}

@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using NLog;
@ -15,11 +16,38 @@ namespace NzbDrone.Core.Messaging.Events
private readonly IServiceFactory _serviceFactory;
private readonly TaskFactory _taskFactory;
private readonly Dictionary<string, object> _eventSubscribers;
private class EventSubscribers<TEvent> where TEvent : class, IEvent
{
private IServiceFactory _serviceFactory;
public IHandle<TEvent>[] _syncHandlers;
public IHandleAsync<TEvent>[] _asyncHandlers;
public IHandleAsync<IEvent>[] _globalHandlers;
public EventSubscribers(IServiceFactory serviceFactory)
{
_serviceFactory = serviceFactory;
_syncHandlers = serviceFactory.BuildAll<IHandle<TEvent>>()
.OrderBy(GetEventHandleOrder)
.ToArray();
_globalHandlers = serviceFactory.BuildAll<IHandleAsync<IEvent>>()
.ToArray();
_asyncHandlers = serviceFactory.BuildAll<IHandleAsync<TEvent>>()
.ToArray();
}
}
public EventAggregator(Logger logger, IServiceFactory serviceFactory)
{
_logger = logger;
_serviceFactory = serviceFactory;
_taskFactory = new TaskFactory();
_eventSubscribers = new Dictionary<string, object>();
}
public void PublishEvent<TEvent>(TEvent @event) where TEvent : class, IEvent
@ -47,11 +75,21 @@ namespace NzbDrone.Core.Messaging.Events
_logger.Trace("Publishing {0}", eventName);
EventSubscribers<TEvent> subscribers;
lock (_eventSubscribers)
{
object target;
if (!_eventSubscribers.TryGetValue(eventName, out target))
{
_eventSubscribers[eventName] = target = new EventSubscribers<TEvent>(_serviceFactory);
}
subscribers = target as EventSubscribers<TEvent>;
}
//call synchronous handlers first.
var handlers = _serviceFactory.BuildAll<IHandle<TEvent>>()
.OrderBy(GetEventHandleOrder)
.ToList();
var handlers = subscribers._syncHandlers;
foreach (var handler in handlers)
{
@ -67,7 +105,7 @@ namespace NzbDrone.Core.Messaging.Events
}
}
foreach (var handler in _serviceFactory.BuildAll<IHandleAsync<IEvent>>())
foreach (var handler in subscribers._globalHandlers)
{
var handlerLocal = handler;
@ -78,7 +116,7 @@ namespace NzbDrone.Core.Messaging.Events
.LogExceptions();
}
foreach (var handler in _serviceFactory.BuildAll<IHandleAsync<TEvent>>())
foreach (var handler in subscribers._asyncHandlers)
{
var handlerLocal = handler;
@ -102,7 +140,7 @@ namespace NzbDrone.Core.Messaging.Events
return string.Format("{0}<{1}>", eventType.Name.Remove(eventType.Name.IndexOf('`')), eventType.GetGenericArguments()[0].Name);
}
private int GetEventHandleOrder<TEvent>(IHandle<TEvent> eventHandler) where TEvent : class, IEvent
internal static int GetEventHandleOrder<TEvent>(IHandle<TEvent> eventHandler) where TEvent : class, IEvent
{
// TODO: Convert "Handle" to nameof(eventHandler.Handle) after .net 4.5
var method = eventHandler.GetType().GetMethod("Handle", new Type[] { typeof(TEvent) });

@ -186,6 +186,7 @@
<Compile Include="Datastore\Migration\011_import_lists.cs" />
<Compile Include="Datastore\Migration\012_add_release_status.cs" />
<Compile Include="Datastore\Migration\015_remove_fanzub.cs" />
<Compile Include="Datastore\Migration\016_update_artist_history_indexes.cs" />
<Compile Include="Datastore\Migration\Framework\MigrationContext.cs" />
<Compile Include="Datastore\Migration\Framework\MigrationController.cs" />
<Compile Include="Datastore\Migration\Framework\MigrationDbFactory.cs" />

Loading…
Cancel
Save