From 35532b67898920e7cda52adcd876083a35fdbf72 Mon Sep 17 00:00:00 2001 From: Taloth Saldono Date: Thu, 4 Jun 2020 00:11:45 +0200 Subject: [PATCH] Fixed: Refreshing Plex Server series in high volume systems --- src/NzbDrone.Common/Cache/CacheManager.cs | 9 +++ src/NzbDrone.Common/Cache/Cached.cs | 75 +++++++++++++----- .../MediaFiles/Events/RenameCompletedEvent.cs | 8 ++ .../MediaFiles/RenameMovieFileService.cs | 4 + .../Notifications/INotification.cs | 3 +- .../Notifications/NotificationBase.cs | 4 + .../Notifications/NotificationService.cs | 65 ++++++++++++---- .../Notifications/Plex/Server/PlexServer.cs | 77 ++++++++++++++++++- .../Plex/Server/PlexServerService.cs | 40 ++++++++-- 9 files changed, 238 insertions(+), 47 deletions(-) create mode 100644 src/NzbDrone.Core/MediaFiles/Events/RenameCompletedEvent.cs diff --git a/src/NzbDrone.Common/Cache/CacheManager.cs b/src/NzbDrone.Common/Cache/CacheManager.cs index 6c080a02e..7d6bb128f 100644 --- a/src/NzbDrone.Common/Cache/CacheManager.cs +++ b/src/NzbDrone.Common/Cache/CacheManager.cs @@ -8,6 +8,7 @@ namespace NzbDrone.Common.Cache { ICached GetCache(Type host); ICached GetCache(Type host, string name); + ICached GetRollingCache(Type host, string name, TimeSpan defaultLifeTime); ICachedDictionary GetCacheDictionary(Type host, string name, Func> fetchFunc = null, TimeSpan? lifeTime = null); void Clear(); ICollection Caches { get; } @@ -43,6 +44,14 @@ namespace NzbDrone.Common.Cache return (ICached)_cache.Get(host.FullName + "_" + name, () => new Cached()); } + public ICached GetRollingCache(Type host, string name, TimeSpan defaultLifeTime) + { + Ensure.That(host, () => host).IsNotNull(); + Ensure.That(name, () => name).IsNotNullOrWhiteSpace(); + + return (ICached)_cache.Get(host.FullName + "_" + name, () => new Cached(defaultLifeTime, true)); + } + public ICachedDictionary GetCacheDictionary(Type host, string name, Func> fetchFunc = null, TimeSpan? lifeTime = null) { Ensure.That(host, () => host).IsNotNull(); diff --git a/src/NzbDrone.Common/Cache/Cached.cs b/src/NzbDrone.Common/Cache/Cached.cs index 2285f548d..a75116417 100644 --- a/src/NzbDrone.Common/Cache/Cached.cs +++ b/src/NzbDrone.Common/Cache/Cached.cs @@ -1,8 +1,9 @@ -using System; +using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using NzbDrone.Common.EnsureThat; +using NzbDrone.Common.Extensions; namespace NzbDrone.Common.Cache { @@ -29,35 +30,49 @@ namespace NzbDrone.Common.Cache } private readonly ConcurrentDictionary _store; + private readonly TimeSpan? _defaultLifeTime; + private readonly bool _rollingExpiry; - public Cached() + public Cached(TimeSpan? defaultLifeTime = null, bool rollingExpiry = false) { _store = new ConcurrentDictionary(); + _defaultLifeTime = defaultLifeTime; + _rollingExpiry = rollingExpiry; } - public void Set(string key, T value, TimeSpan? lifetime = null) + public void Set(string key, T value, TimeSpan? lifeTime = null) { Ensure.That(key, () => key).IsNotNullOrWhiteSpace(); - _store[key] = new CacheItem(value, lifetime); + _store[key] = new CacheItem(value, lifeTime ?? _defaultLifeTime); } public T Find(string key) { - CacheItem value; - _store.TryGetValue(key, out value); - - if (value == null) + CacheItem cacheItem; + if (!_store.TryGetValue(key, out cacheItem)) { return default(T); } - if (value.IsExpired()) + if (cacheItem.IsExpired()) { - _store.TryRemove(key, out value); - return default(T); + if (TryRemove(key, cacheItem)) + { + return default(T); + } + + if (!_store.TryGetValue(key, out cacheItem)) + { + return default(T); + } + } + + if (_rollingExpiry && _defaultLifeTime.HasValue) + { + _store.TryUpdate(key, new CacheItem(cacheItem.Object, _defaultLifeTime.Value), cacheItem); } - return value.Object; + return cacheItem.Object; } public void Remove(string key) @@ -72,20 +87,31 @@ namespace NzbDrone.Common.Cache { Ensure.That(key, () => key).IsNotNullOrWhiteSpace(); + lifeTime = lifeTime ?? _defaultLifeTime; + CacheItem cacheItem; - T value; - if (!_store.TryGetValue(key, out cacheItem) || cacheItem.IsExpired()) + if (_store.TryGetValue(key, out cacheItem) && !cacheItem.IsExpired()) { - value = function(); - Set(key, value, lifeTime); + if (_rollingExpiry && lifeTime.HasValue) + { + _store.TryUpdate(key, new CacheItem(cacheItem.Object, lifeTime), cacheItem); + } } else { - value = cacheItem.Object; + var newCacheItem = new CacheItem(function(), lifeTime); + if (cacheItem != null && _store.TryUpdate(key, newCacheItem, cacheItem)) + { + cacheItem = newCacheItem; + } + else + { + cacheItem = _store.GetOrAdd(key, newCacheItem); + } } - return value; + return cacheItem.Object; } public void Clear() @@ -95,9 +121,11 @@ namespace NzbDrone.Common.Cache public void ClearExpired() { - foreach (var cached in _store.Where(c => c.Value.IsExpired())) + var collection = (ICollection>)_store; + + foreach (var cached in _store.Where(c => c.Value.IsExpired()).ToList()) { - Remove(cached.Key); + collection.Remove(cached); } } @@ -108,5 +136,12 @@ namespace NzbDrone.Common.Cache return _store.Values.Select(c => c.Object).ToList(); } } + + private bool TryRemove(string key, CacheItem value) + { + var collection = (ICollection>)_store; + + return collection.Remove(new KeyValuePair(key, value)); + } } } diff --git a/src/NzbDrone.Core/MediaFiles/Events/RenameCompletedEvent.cs b/src/NzbDrone.Core/MediaFiles/Events/RenameCompletedEvent.cs new file mode 100644 index 000000000..40e3b8108 --- /dev/null +++ b/src/NzbDrone.Core/MediaFiles/Events/RenameCompletedEvent.cs @@ -0,0 +1,8 @@ +using NzbDrone.Common.Messaging; + +namespace NzbDrone.Core.MediaFiles.Events +{ + public class RenameCompletedEvent : IEvent + { + } +} diff --git a/src/NzbDrone.Core/MediaFiles/RenameMovieFileService.cs b/src/NzbDrone.Core/MediaFiles/RenameMovieFileService.cs index 0650700b1..bf7c81f78 100644 --- a/src/NzbDrone.Core/MediaFiles/RenameMovieFileService.cs +++ b/src/NzbDrone.Core/MediaFiles/RenameMovieFileService.cs @@ -127,6 +127,8 @@ namespace NzbDrone.Core.MediaFiles _logger.ProgressInfo("Renaming {0} files for {1}", movieFiles.Count, movie.Title); RenameFiles(movieFiles, movie); _logger.ProgressInfo("Selected movie files renamed for {0}", movie.Title); + + _eventAggregator.PublishEvent(new RenameCompletedEvent()); } public void Execute(RenameMovieCommand message) @@ -141,6 +143,8 @@ namespace NzbDrone.Core.MediaFiles RenameFiles(movieFiles, movie); _logger.ProgressInfo("All movie files renamed for {0}", movie.Title); } + + _eventAggregator.PublishEvent(new RenameCompletedEvent()); } } } diff --git a/src/NzbDrone.Core/Notifications/INotification.cs b/src/NzbDrone.Core/Notifications/INotification.cs index 0a1f9a1f2..7b955950f 100644 --- a/src/NzbDrone.Core/Notifications/INotification.cs +++ b/src/NzbDrone.Core/Notifications/INotification.cs @@ -1,4 +1,4 @@ -using NzbDrone.Core.Movies; +using NzbDrone.Core.Movies; using NzbDrone.Core.ThingiProvider; namespace NzbDrone.Core.Notifications @@ -12,6 +12,7 @@ namespace NzbDrone.Core.Notifications void OnMovieRename(Movie movie); void OnHealthIssue(HealthCheck.HealthCheck healthCheck); void OnDelete(DeleteMessage deleteMessage); + void ProcessQueue(); bool SupportsOnGrab { get; } bool SupportsOnDownload { get; } bool SupportsOnUpgrade { get; } diff --git a/src/NzbDrone.Core/Notifications/NotificationBase.cs b/src/NzbDrone.Core/Notifications/NotificationBase.cs index a7a27d566..bcc98bf01 100644 --- a/src/NzbDrone.Core/Notifications/NotificationBase.cs +++ b/src/NzbDrone.Core/Notifications/NotificationBase.cs @@ -50,6 +50,10 @@ namespace NzbDrone.Core.Notifications { } + public virtual void ProcessQueue() + { + } + public bool SupportsOnGrab => HasConcreteImplementation("OnGrab"); public bool SupportsOnRename => HasConcreteImplementation("OnMovieRename"); public bool SupportsOnDownload => HasConcreteImplementation("OnDownload"); diff --git a/src/NzbDrone.Core/Notifications/NotificationService.cs b/src/NzbDrone.Core/Notifications/NotificationService.cs index 2eeb6dbdd..a6662872a 100644 --- a/src/NzbDrone.Core/Notifications/NotificationService.cs +++ b/src/NzbDrone.Core/Notifications/NotificationService.cs @@ -17,7 +17,10 @@ namespace NzbDrone.Core.Notifications IHandle, IHandle, IHandle, - IHandle + IHandle, + IHandleAsync, + IHandleAsync, + IHandleAsync { private readonly INotificationFactory _notificationFactory; private readonly Logger _logger; @@ -176,26 +179,56 @@ namespace NzbDrone.Core.Notifications public void Handle(MovieFileDeletedEvent message) { - var deleteMessage = new DeleteMessage(); - deleteMessage.Message = GetMessage(message.MovieFile.Movie, message.MovieFile.Quality); - deleteMessage.MovieFile = message.MovieFile; - deleteMessage.Movie = message.MovieFile.Movie; - deleteMessage.Reason = message.Reason; + var deleteMessage = new DeleteMessage(); + deleteMessage.Message = GetMessage(message.MovieFile.Movie, message.MovieFile.Quality); + deleteMessage.MovieFile = message.MovieFile; + deleteMessage.Movie = message.MovieFile.Movie; + deleteMessage.Reason = message.Reason; - foreach (var notification in _notificationFactory.OnDeleteEnabled()) + foreach (var notification in _notificationFactory.OnDeleteEnabled()) + { + try { - try - { - if (ShouldHandleMovie(notification.Definition, message.MovieFile.Movie)) - { - notification.OnDelete(deleteMessage); - } - } - catch (Exception ex) + if (ShouldHandleMovie(notification.Definition, message.MovieFile.Movie)) { - _logger.Warn(ex, "Unable to send OnDelete notification to: " + notification.Definition.Name); + notification.OnDelete(deleteMessage); } } + catch (Exception ex) + { + _logger.Warn(ex, "Unable to send OnDelete notification to: " + notification.Definition.Name); + } + } + } + + public void HandleAsync(DownloadsProcessedEvent message) + { + ProcessQueue(); + } + + public void HandleAsync(RenameCompletedEvent message) + { + ProcessQueue(); + } + + public void HandleAsync(HealthCheckCompleteEvent message) + { + ProcessQueue(); + } + + private void ProcessQueue() + { + foreach (var notification in _notificationFactory.GetAvailableProviders()) + { + try + { + notification.ProcessQueue(); + } + catch (Exception ex) + { + _logger.Warn(ex, "Unable to process notification queue for " + notification.Definition.Name); + } + } } } } diff --git a/src/NzbDrone.Core/Notifications/Plex/Server/PlexServer.cs b/src/NzbDrone.Core/Notifications/Plex/Server/PlexServer.cs index 6412b6e2b..a3b3ceafc 100644 --- a/src/NzbDrone.Core/Notifications/Plex/Server/PlexServer.cs +++ b/src/NzbDrone.Core/Notifications/Plex/Server/PlexServer.cs @@ -1,6 +1,9 @@ using System; using System.Collections.Generic; +using System.Linq; using FluentValidation.Results; +using NLog; +using NzbDrone.Common.Cache; using NzbDrone.Common.Extensions; using NzbDrone.Core.Exceptions; using NzbDrone.Core.Movies; @@ -13,11 +16,23 @@ namespace NzbDrone.Core.Notifications.Plex.Server { private readonly IPlexServerService _plexServerService; private readonly IPlexTvService _plexTvService; + private readonly Logger _logger; - public PlexServer(IPlexServerService plexServerService, IPlexTvService plexTvService) + private class PlexUpdateQueue + { + public Dictionary Pending { get; } = new Dictionary(); + public bool Refreshing { get; set; } + } + + private readonly ICached _pendingMoviesCache; + + public PlexServer(IPlexServerService plexServerService, IPlexTvService plexTvService, ICacheManager cacheManager, Logger logger) { _plexServerService = plexServerService; _plexTvService = plexTvService; + _logger = logger; + + _pendingMoviesCache = cacheManager.GetRollingCache(GetType(), "pendingSeries", TimeSpan.FromDays(1)); } public override string Link => "https://www.plex.tv/"; @@ -37,7 +52,65 @@ namespace NzbDrone.Core.Notifications.Plex.Server { if (Settings.UpdateLibrary) { - _plexServerService.UpdateLibrary(movie, Settings); + _logger.Debug("Scheduling library update for movie {0} {1}", movie.Id, movie.Title); + var queue = _pendingMoviesCache.Get(Settings.Host, () => new PlexUpdateQueue()); + lock (queue) + { + queue.Pending[movie.Id] = movie; + } + } + } + + public override void ProcessQueue() + { + PlexUpdateQueue queue = _pendingMoviesCache.Find(Settings.Host); + if (queue == null) + { + return; + } + + lock (queue) + { + if (queue.Refreshing) + { + return; + } + + queue.Refreshing = true; + } + + try + { + while (true) + { + List refreshingMovies; + lock (queue) + { + if (queue.Pending.Empty()) + { + queue.Refreshing = false; + return; + } + + refreshingMovies = queue.Pending.Values.ToList(); + queue.Pending.Clear(); + } + + if (Settings.UpdateLibrary) + { + _logger.Debug("Performing library update for {0} movies", refreshingMovies.Count); + _plexServerService.UpdateLibrary(refreshingMovies, Settings); + } + } + } + catch + { + lock (queue) + { + queue.Refreshing = false; + } + + throw; } } diff --git a/src/NzbDrone.Core/Notifications/Plex/Server/PlexServerService.cs b/src/NzbDrone.Core/Notifications/Plex/Server/PlexServerService.cs index 6b2d1ccce..0d3625a5c 100644 --- a/src/NzbDrone.Core/Notifications/Plex/Server/PlexServerService.cs +++ b/src/NzbDrone.Core/Notifications/Plex/Server/PlexServerService.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; using System.Text.RegularExpressions; using FluentValidation.Results; @@ -13,6 +14,7 @@ namespace NzbDrone.Core.Notifications.Plex.Server public interface IPlexServerService { void UpdateLibrary(Movie movie, PlexServerSettings settings); + void UpdateLibrary(IEnumerable movie, PlexServerSettings settings); ValidationFailure Test(PlexServerSettings settings); } @@ -32,10 +34,16 @@ namespace NzbDrone.Core.Notifications.Plex.Server } public void UpdateLibrary(Movie movie, PlexServerSettings settings) + { + UpdateLibrary(new[] { movie }, settings); + } + + public void UpdateLibrary(IEnumerable multipleMovies, PlexServerSettings settings) { try { _logger.Debug("Sending Update Request to Plex Server"); + var watch = Stopwatch.StartNew(); var version = _versionCache.Get(settings.Host, () => GetVersion(settings), TimeSpan.FromHours(2)); ValidateVersion(version); @@ -45,12 +53,31 @@ namespace NzbDrone.Core.Notifications.Plex.Server if (partialUpdates) { - UpdatePartialSection(movie, sections, settings); + var partiallyUpdated = true; + + foreach (var movie in multipleMovies) + { + partiallyUpdated &= UpdatePartialSection(movie, sections, settings); + + if (!partiallyUpdated) + { + break; + } + } + + // Only update complete sections if all partial updates failed + if (!partiallyUpdated) + { + _logger.Debug("Unable to update partial section, updating all Movie sections"); + sections.ForEach(s => UpdateSection(s.Id, settings)); + } } else { sections.ForEach(s => UpdateSection(s.Id, settings)); } + + _logger.Debug("Finished sending Update Request to Plex Server (took {0} ms)", watch.ElapsedMilliseconds); } catch (Exception ex) { @@ -123,7 +150,7 @@ namespace NzbDrone.Core.Notifications.Plex.Server _plexServerProxy.Update(sectionId, settings); } - private void UpdatePartialSection(Movie movie, List sections, PlexServerSettings settings) + private bool UpdatePartialSection(Movie movie, List sections, PlexServerSettings settings) { var partiallyUpdated = false; @@ -140,12 +167,7 @@ namespace NzbDrone.Core.Notifications.Plex.Server } } - // Only update complete sections if all partial updates failed - if (!partiallyUpdated) - { - _logger.Debug("Unable to update partial section, updating all Movie sections"); - sections.ForEach(s => UpdateSection(s.Id, settings)); - } + return partiallyUpdated; } private int? GetMetadataId(int sectionId, Movie movie, string language, PlexServerSettings settings) @@ -159,6 +181,8 @@ namespace NzbDrone.Core.Notifications.Plex.Server { try { + _versionCache.Remove(settings.Host); + _partialUpdateCache.Remove(settings.Host); var sections = GetSections(settings); if (sections.Empty())