Fixed: Refreshing Plex Server series in high volume systems

pull/3800/head
Taloth Saldono 4 years ago
parent e8d843c93d
commit 903aba5dee

@ -8,6 +8,7 @@ namespace NzbDrone.Common.Cache
{
ICached<T> GetCache<T>(Type host);
ICached<T> GetCache<T>(Type host, string name);
ICached<T> GetRollingCache<T>(Type host, string name, TimeSpan defaultLifeTime);
ICachedDictionary<T> GetCacheDictionary<T>(Type host, string name, Func<IDictionary<string, T>> fetchFunc = null, TimeSpan? lifeTime = null);
void Clear();
ICollection<ICached> Caches { get; }
@ -44,6 +45,14 @@ namespace NzbDrone.Common.Cache
return (ICached<T>)_cache.Get(host.FullName + "_" + name, () => new Cached<T>());
}
public ICached<T> GetRollingCache<T>(Type host, string name, TimeSpan defaultLifeTime)
{
Ensure.That(host, () => host).IsNotNull();
Ensure.That(name, () => name).IsNotNullOrWhiteSpace();
return (ICached<T>)_cache.Get(host.FullName + "_" + name, () => new Cached<T>(defaultLifeTime, true));
}
public ICachedDictionary<T> GetCacheDictionary<T>(Type host, string name, Func<IDictionary<string, T>> fetchFunc = null, TimeSpan? lifeTime = null)
{
Ensure.That(host, () => host).IsNotNull();

@ -3,6 +3,7 @@ using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using NzbDrone.Common.EnsureThat;
using NzbDrone.Common.Extensions;
namespace NzbDrone.Common.Cache
{
@ -30,35 +31,49 @@ namespace NzbDrone.Common.Cache
}
private readonly ConcurrentDictionary<string, CacheItem> _store;
private readonly TimeSpan? _defaultLifeTime;
private readonly bool _rollingExpiry;
public Cached()
public Cached(TimeSpan? defaultLifeTime = null, bool rollingExpiry = false)
{
_store = new ConcurrentDictionary<string, CacheItem>();
_defaultLifeTime = defaultLifeTime;
_rollingExpiry = rollingExpiry;
}
public void Set(string key, T value, TimeSpan? lifetime = null)
public void Set(string key, T value, TimeSpan? lifeTime = null)
{
Ensure.That(key, () => key).IsNotNullOrWhiteSpace();
_store[key] = new CacheItem(value, lifetime);
_store[key] = new CacheItem(value, lifeTime ?? _defaultLifeTime);
}
public T Find(string key)
{
CacheItem value;
_store.TryGetValue(key, out value);
if (value == null)
CacheItem cacheItem;
if (!_store.TryGetValue(key, out cacheItem))
{
return default(T);
}
if (value.IsExpired())
if (cacheItem.IsExpired())
{
_store.TryRemove(key, out value);
return default(T);
if (TryRemove(key, cacheItem))
{
return default(T);
}
if (!_store.TryGetValue(key, out cacheItem))
{
return default(T);
}
}
if (_rollingExpiry && _defaultLifeTime.HasValue)
{
_store.TryUpdate(key, new CacheItem(cacheItem.Object, _defaultLifeTime.Value), cacheItem);
}
return value.Object;
return cacheItem.Object;
}
public void Remove(string key)
@ -73,20 +88,31 @@ namespace NzbDrone.Common.Cache
{
Ensure.That(key, () => key).IsNotNullOrWhiteSpace();
lifeTime = lifeTime ?? _defaultLifeTime;
CacheItem cacheItem;
T value;
if (!_store.TryGetValue(key, out cacheItem) || cacheItem.IsExpired())
if (_store.TryGetValue(key, out cacheItem) && !cacheItem.IsExpired())
{
value = function();
Set(key, value, lifeTime);
if (_rollingExpiry && lifeTime.HasValue)
{
_store.TryUpdate(key, new CacheItem(cacheItem.Object, lifeTime), cacheItem);
}
}
else
{
value = cacheItem.Object;
var newCacheItem = new CacheItem(function(), lifeTime);
if (cacheItem != null && _store.TryUpdate(key, newCacheItem, cacheItem))
{
cacheItem = newCacheItem;
}
else
{
cacheItem = _store.GetOrAdd(key, newCacheItem);
}
}
return value;
return cacheItem.Object;
}
public void Clear()
@ -96,9 +122,11 @@ namespace NzbDrone.Common.Cache
public void ClearExpired()
{
foreach (var cached in _store.Where(c => c.Value.IsExpired()))
var collection = (ICollection<KeyValuePair<string, CacheItem>>)_store;
foreach (var cached in _store.Where(c => c.Value.IsExpired()).ToList())
{
Remove(cached.Key);
collection.Remove(cached);
}
}
@ -110,5 +138,12 @@ namespace NzbDrone.Common.Cache
}
}
private bool TryRemove(string key, CacheItem value)
{
var collection = (ICollection<KeyValuePair<string, CacheItem>>)_store;
return collection.Remove(new KeyValuePair<string, CacheItem>(key, value));
}
}
}

@ -0,0 +1,9 @@
using NzbDrone.Common.Messaging;
namespace NzbDrone.Core.MediaFiles.Events
{
public class RenameCompletedEvent : IEvent
{
}
}

@ -154,6 +154,8 @@ namespace NzbDrone.Core.MediaFiles
_logger.ProgressInfo("Renaming {0} files for {1}", episodeFiles.Count, series.Title);
RenameFiles(episodeFiles, series);
_logger.ProgressInfo("Selected episode files renamed for {0}", series.Title);
_eventAggregator.PublishEvent(new RenameCompletedEvent());
}
public void Execute(RenameSeriesCommand message)
@ -168,6 +170,8 @@ namespace NzbDrone.Core.MediaFiles
RenameFiles(episodeFiles, series);
_logger.ProgressInfo("All episode files renamed for {0}", series.Title);
}
_eventAggregator.PublishEvent(new RenameCompletedEvent());
}
}
}

@ -11,6 +11,7 @@ namespace NzbDrone.Core.Notifications
void OnDownload(DownloadMessage message);
void OnRename(Series series);
void OnHealthIssue(HealthCheck.HealthCheck healthCheck);
void ProcessQueue();
bool SupportsOnGrab { get; }
bool SupportsOnDownload { get; }
bool SupportsOnUpgrade { get; }

@ -49,6 +49,11 @@ namespace NzbDrone.Core.Notifications
}
public virtual void ProcessQueue()
{
}
public bool SupportsOnGrab => HasConcreteImplementation("OnGrab");
public bool SupportsOnRename => HasConcreteImplementation("OnRename");
public bool SupportsOnDownload => HasConcreteImplementation("OnDownload");
@ -76,6 +81,5 @@ namespace NzbDrone.Core.Notifications
return !method.DeclaringType.IsAbstract;
}
}
}

@ -17,7 +17,10 @@ namespace NzbDrone.Core.Notifications
: IHandle<EpisodeGrabbedEvent>,
IHandle<EpisodeImportedEvent>,
IHandle<SeriesRenamedEvent>,
IHandle<HealthCheckFailedEvent>
IHandle<HealthCheckFailedEvent>,
IHandleAsync<DownloadsProcessedEvent>,
IHandleAsync<RenameCompletedEvent>,
IHandleAsync<HealthCheckCompleteEvent>
{
private readonly INotificationFactory _notificationFactory;
private readonly Logger _logger;
@ -204,5 +207,35 @@ namespace NzbDrone.Core.Notifications
}
}
}
public void HandleAsync(DownloadsProcessedEvent message)
{
ProcessQueue();
}
public void HandleAsync(RenameCompletedEvent message)
{
ProcessQueue();
}
public void HandleAsync(HealthCheckCompleteEvent message)
{
ProcessQueue();
}
private void ProcessQueue()
{
foreach (var notification in _notificationFactory.GetAvailableProviders())
{
try
{
notification.ProcessQueue();
}
catch (Exception ex)
{
_logger.Warn(ex, "Unable to process notification queue for " + notification.Definition.Name);
}
}
}
}
}

@ -1,6 +1,9 @@
using System;
using System.Collections.Generic;
using System.Linq;
using FluentValidation.Results;
using NLog;
using NzbDrone.Common.Cache;
using NzbDrone.Common.Extensions;
using NzbDrone.Core.Exceptions;
using NzbDrone.Core.Notifications.Plex.PlexTv;
@ -13,11 +16,23 @@ namespace NzbDrone.Core.Notifications.Plex.Server
{
private readonly IPlexServerService _plexServerService;
private readonly IPlexTvService _plexTvService;
private readonly Logger _logger;
public PlexServer(IPlexServerService plexServerService, IPlexTvService plexTvService)
class PlexUpdateQueue
{
public Dictionary<int, Series> Pending { get; } = new Dictionary<int, Series>();
public bool Refreshing { get; set; }
}
private readonly ICached<PlexUpdateQueue> _pendingSeriesCache;
public PlexServer(IPlexServerService plexServerService, IPlexTvService plexTvService, ICacheManager cacheManager, Logger logger)
{
_plexServerService = plexServerService;
_plexTvService = plexTvService;
_logger = logger;
_pendingSeriesCache = cacheManager.GetRollingCache<PlexUpdateQueue>(GetType(), "pendingSeries", TimeSpan.FromDays(1));
}
public override string Link => "https://www.plex.tv/";
@ -37,7 +52,63 @@ namespace NzbDrone.Core.Notifications.Plex.Server
{
if (Settings.UpdateLibrary)
{
_plexServerService.UpdateLibrary(series, Settings);
_logger.Debug("Scheduling library update for series {0} {1}", series.Id, series.Title);
var queue = _pendingSeriesCache.Get(Settings.Host, () => new PlexUpdateQueue());
lock (queue)
{
queue.Pending[series.Id] = series;
}
}
}
public override void ProcessQueue()
{
PlexUpdateQueue queue = _pendingSeriesCache.Find(Settings.Host);
if (queue == null)
{
return;
}
lock (queue)
{
if (queue.Refreshing)
{
return;
}
queue.Refreshing = true;
}
try
{
while (true)
{
List<Series> refreshingSeries;
lock (queue)
{
if (queue.Pending.Empty())
{
queue.Refreshing = false;
return;
}
refreshingSeries = queue.Pending.Values.ToList();
queue.Pending.Clear();
}
if (Settings.UpdateLibrary)
{
_logger.Debug("Performing library update for {0} series", refreshingSeries.Count);
_plexServerService.UpdateLibrary(refreshingSeries, Settings);
}
}
}
catch
{
lock (queue)
{
queue.Refreshing = false;
}
throw;
}
}

@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text.RegularExpressions;
using FluentValidation.Results;
@ -13,6 +14,7 @@ namespace NzbDrone.Core.Notifications.Plex.Server
public interface IPlexServerService
{
void UpdateLibrary(Series series, PlexServerSettings settings);
void UpdateLibrary(IEnumerable<Series> series, PlexServerSettings settings);
ValidationFailure Test(PlexServerSettings settings);
}
@ -32,10 +34,16 @@ namespace NzbDrone.Core.Notifications.Plex.Server
}
public void UpdateLibrary(Series series, PlexServerSettings settings)
{
UpdateLibrary(new[] { series }, settings);
}
public void UpdateLibrary(IEnumerable<Series> multipleSeries, PlexServerSettings settings)
{
try
{
_logger.Debug("Sending Update Request to Plex Server");
var watch = Stopwatch.StartNew();
var version = _versionCache.Get(settings.Host, () => GetVersion(settings), TimeSpan.FromHours(2));
ValidateVersion(version);
@ -45,16 +53,34 @@ namespace NzbDrone.Core.Notifications.Plex.Server
if (partialUpdates)
{
UpdatePartialSection(series, sections, settings);
}
var partiallyUpdated = true;
foreach (var series in multipleSeries)
{
partiallyUpdated &= UpdatePartialSection(series, sections, settings);
if (!partiallyUpdated)
{
break;
}
}
// Only update complete sections if all partial updates failed
if (!partiallyUpdated)
{
_logger.Debug("Unable to update partial section, updating all TV sections");
sections.ForEach(s => UpdateSection(s.Id, settings));
}
}
else
{
sections.ForEach(s => UpdateSection(s.Id, settings));
}
_logger.Debug("Finished sending Update Request to Plex Server (took {0} ms)", watch.ElapsedMilliseconds);
}
catch(Exception ex)
catch (Exception ex)
{
_logger.Warn(ex, "Failed to Update Plex host: " + settings.Host);
throw;
@ -125,7 +151,7 @@ namespace NzbDrone.Core.Notifications.Plex.Server
_plexServerProxy.Update(sectionId, settings);
}
private void UpdatePartialSection(Series series, List<PlexSection> sections, PlexServerSettings settings)
private bool UpdatePartialSection(Series series, List<PlexSection> sections, PlexServerSettings settings)
{
var partiallyUpdated = false;
@ -142,12 +168,7 @@ namespace NzbDrone.Core.Notifications.Plex.Server
}
}
// Only update complete sections if all partial updates failed
if (!partiallyUpdated)
{
_logger.Debug("Unable to update partial section, updating all TV sections");
sections.ForEach(s => UpdateSection(s.Id, settings));
}
return partiallyUpdated;
}
private int? GetMetadataId(int sectionId, Series series, string language, PlexServerSettings settings)
@ -161,6 +182,8 @@ namespace NzbDrone.Core.Notifications.Plex.Server
{
try
{
_versionCache.Remove(settings.Host);
_partialUpdateCache.Remove(settings.Host);
var sections = GetSections(settings);
if (sections.Empty())

Loading…
Cancel
Save