Improved the way we sync the plex content and then get the metadata. #2243

pull/2258/head
Jamie Rees 7 years ago
parent 3ca72c7e63
commit 83f871da86

@ -1,9 +1,11 @@
using System.Threading.Tasks; using System.Collections.Generic;
using System.Threading.Tasks;
namespace Ombi.Schedule.Jobs.Ombi namespace Ombi.Schedule.Jobs.Ombi
{ {
public interface IRefreshMetadata : IBaseJob public interface IRefreshMetadata : IBaseJob
{ {
Task Start(); Task Start();
Task ProcessPlexServerContent(IEnumerable<int> contentIds);
} }
} }

@ -1,4 +1,5 @@
using System; using System;
using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Threading.Tasks; using System.Threading.Tasks;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
@ -17,7 +18,7 @@ namespace Ombi.Schedule.Jobs.Ombi
{ {
public RefreshMetadata(IPlexContentRepository plexRepo, IEmbyContentRepository embyRepo, public RefreshMetadata(IPlexContentRepository plexRepo, IEmbyContentRepository embyRepo,
ILogger<RefreshMetadata> log, ITvMazeApi tvApi, ISettingsService<PlexSettings> plexSettings, ILogger<RefreshMetadata> log, ITvMazeApi tvApi, ISettingsService<PlexSettings> plexSettings,
IMovieDbApi movieApi) IMovieDbApi movieApi, ISettingsService<EmbySettings> embySettings)
{ {
_plexRepo = plexRepo; _plexRepo = plexRepo;
_embyRepo = embyRepo; _embyRepo = embyRepo;
@ -25,6 +26,7 @@ namespace Ombi.Schedule.Jobs.Ombi
_movieApi = movieApi; _movieApi = movieApi;
_tvApi = tvApi; _tvApi = tvApi;
_plexSettings = plexSettings; _plexSettings = plexSettings;
_embySettings = embySettings;
} }
private readonly IPlexContentRepository _plexRepo; private readonly IPlexContentRepository _plexRepo;
@ -33,6 +35,7 @@ namespace Ombi.Schedule.Jobs.Ombi
private readonly IMovieDbApi _movieApi; private readonly IMovieDbApi _movieApi;
private readonly ITvMazeApi _tvApi; private readonly ITvMazeApi _tvApi;
private readonly ISettingsService<PlexSettings> _plexSettings; private readonly ISettingsService<PlexSettings> _plexSettings;
private readonly ISettingsService<EmbySettings> _embySettings;
public async Task Start() public async Task Start()
{ {
@ -43,6 +46,11 @@ namespace Ombi.Schedule.Jobs.Ombi
if (settings.Enable) if (settings.Enable)
{ {
await StartPlex(); await StartPlex();
}
var embySettings = await _embySettings.GetSettingsAsync();
if (embySettings.Enable)
{
await StartEmby(); await StartEmby();
} }
} }
@ -53,12 +61,45 @@ namespace Ombi.Schedule.Jobs.Ombi
} }
} }
public async Task ProcessPlexServerContent(IEnumerable<int> contentIds)
{
_log.LogInformation("Starting the Metadata refresh from RecentlyAddedSync");
try
{
var settings = await _plexSettings.GetSettingsAsync();
if (settings.Enable)
{
await StartPlexWithKnownContent(contentIds);
}
}
catch (Exception e)
{
_log.LogError(e, "Exception when refreshing the Plex Metadata");
throw;
}
}
private async Task StartPlexWithKnownContent(IEnumerable<int> contentids)
{
var everything = _plexRepo.GetAll().Where(x => contentids.Contains(x.Id));
var allMovies = everything.Where(x => x.Type == PlexMediaTypeEntity.Movie);
await StartPlexMovies(allMovies);
// Now Tv
var allTv = everything.Where(x => x.Type == PlexMediaTypeEntity.Show);
await StartPlexTv(allTv);
}
private async Task StartPlex() private async Task StartPlex()
{ {
await StartPlexMovies(); var allMovies = _plexRepo.GetAll().Where(x =>
x.Type == PlexMediaTypeEntity.Movie && (!x.TheMovieDbId.HasValue() || !x.ImdbId.HasValue()));
await StartPlexMovies(allMovies);
// Now Tv // Now Tv
await StartPlexTv(); var allTv = _plexRepo.GetAll().Where(x =>
x.Type == PlexMediaTypeEntity.Show && (!x.TheMovieDbId.HasValue() || !x.ImdbId.HasValue() || !x.TvDbId.HasValue()));
await StartPlexTv(allTv);
} }
private async Task StartEmby() private async Task StartEmby()
@ -67,10 +108,8 @@ namespace Ombi.Schedule.Jobs.Ombi
await StartEmbyTv(); await StartEmbyTv();
} }
private async Task StartPlexTv() private async Task StartPlexTv(IQueryable<PlexServerContent> allTv)
{ {
var allTv = _plexRepo.GetAll().Where(x =>
x.Type == PlexMediaTypeEntity.Show && (!x.TheMovieDbId.HasValue() || !x.ImdbId.HasValue() || !x.TvDbId.HasValue()));
var tvCount = 0; var tvCount = 0;
foreach (var show in allTv) foreach (var show in allTv)
{ {
@ -147,10 +186,8 @@ namespace Ombi.Schedule.Jobs.Ombi
await _embyRepo.SaveChangesAsync(); await _embyRepo.SaveChangesAsync();
} }
private async Task StartPlexMovies() private async Task StartPlexMovies(IQueryable<PlexServerContent> allMovies)
{ {
var allMovies = _plexRepo.GetAll().Where(x =>
x.Type == PlexMediaTypeEntity.Movie && (!x.TheMovieDbId.HasValue() || !x.ImdbId.HasValue()));
int movieCount = 0; int movieCount = 0;
foreach (var movie in allMovies) foreach (var movie in allMovies)
{ {

@ -1,10 +1,14 @@
using System; using System;
using System.Linq;
using System.Threading.Tasks; using System.Threading.Tasks;
using Ombi.Api.Plex.Models;
using Ombi.Store.Entities;
namespace Ombi.Schedule.Jobs.Plex.Interfaces namespace Ombi.Schedule.Jobs.Plex.Interfaces
{ {
public interface IPlexEpisodeSync : IBaseJob public interface IPlexEpisodeSync : IBaseJob
{ {
Task Start(); Task Start();
Task ProcessEpsiodes(Metadata[] episodes, IQueryable<PlexEpisode> currentEpisodes);
} }
} }

@ -77,11 +77,22 @@ namespace Ombi.Schedule.Jobs.Plex
Logger.LogError("Plex Settings are not valid"); Logger.LogError("Plex Settings are not valid");
return; return;
} }
var processedContent = new HashSet<int>();
Logger.LogInformation("Starting Plex Content Cacher"); Logger.LogInformation("Starting Plex Content Cacher");
try try
{ {
await StartTheCache(plexSettings, recentlyAddedSearch); if (recentlyAddedSearch)
{
var result = await StartTheCache(plexSettings, true);
foreach (var r in result)
{
processedContent.Add(r);
}
}
else
{
await StartTheCache(plexSettings, false);
}
} }
catch (Exception e) catch (Exception e)
{ {
@ -94,33 +105,58 @@ namespace Ombi.Schedule.Jobs.Plex
BackgroundJob.Enqueue(() => EpisodeSync.Start()); BackgroundJob.Enqueue(() => EpisodeSync.Start());
} }
BackgroundJob.Enqueue(() => Metadata.Start()); if (processedContent.Any() && recentlyAddedSearch)
{
// Just check what we send it
BackgroundJob.Enqueue(() => Metadata.ProcessPlexServerContent(processedContent));
}
} }
private async Task StartTheCache(PlexSettings plexSettings, bool recentlyAddedSearch) private async Task<IEnumerable<int>> StartTheCache(PlexSettings plexSettings, bool recentlyAddedSearch)
{ {
var processedContent = new HashSet<int>();
foreach (var servers in plexSettings.Servers ?? new List<PlexServers>()) foreach (var servers in plexSettings.Servers ?? new List<PlexServers>())
{ {
try try
{ {
Logger.LogInformation("Starting to cache the content on server {0}", servers.Name); Logger.LogInformation("Starting to cache the content on server {0}", servers.Name);
await ProcessServer(servers, recentlyAddedSearch);
if (recentlyAddedSearch)
{
// If it's recently added search then we want the results to pass to the metadata job
// This way the metadata job is smaller in size to process, it only need to look at newly added shit
var result = await ProcessServer(servers, true);
foreach (var plexServerContent in result)
{
processedContent.Add(plexServerContent);
}
}
else
{
await ProcessServer(servers, false);
}
} }
catch (Exception e) catch (Exception e)
{ {
Logger.LogWarning(LoggingEvents.PlexContentCacher, e, "Exception thrown when attempting to cache the Plex Content in server {0}", servers.Name); Logger.LogWarning(LoggingEvents.PlexContentCacher, e, "Exception thrown when attempting to cache the Plex Content in server {0}", servers.Name);
} }
} }
return processedContent;
} }
private async Task ProcessServer(PlexServers servers, bool recentlyAddedSearch) private async Task<IEnumerable<int>> ProcessServer(PlexServers servers, bool recentlyAddedSearch)
{ {
var processedContent = new HashSet<int>();
Logger.LogInformation("Getting all content from server {0}", servers.Name); Logger.LogInformation("Getting all content from server {0}", servers.Name);
var allContent = await GetAllContent(servers, recentlyAddedSearch); var allContent = await GetAllContent(servers, recentlyAddedSearch);
Logger.LogInformation("We found {0} items", allContent.Count); Logger.LogInformation("We found {0} items", allContent.Count);
// Let's now process this. // Let's now process this.
var contentToAdd = new HashSet<PlexServerContent>(); var contentToAdd = new HashSet<PlexServerContent>();
var allEps = Repo.GetAllEpisodes();
foreach (var content in allContent) foreach (var content in allContent)
{ {
if (content.viewGroup.Equals(PlexMediaType.Episode.ToString(), StringComparison.CurrentCultureIgnoreCase)) if (content.viewGroup.Equals(PlexMediaType.Episode.ToString(), StringComparison.CurrentCultureIgnoreCase))
@ -137,8 +173,10 @@ namespace Ombi.Schedule.Jobs.Plex
continue; continue;
} }
await ProcessTvShow(servers, show, contentToAdd, recentlyAddedSearch); await ProcessTvShow(servers, show, contentToAdd, recentlyAddedSearch, processedContent);
} }
await EpisodeSync.ProcessEpsiodes(content.Metadata, allEps);
} }
if (content.viewGroup.Equals(PlexMediaType.Show.ToString(), StringComparison.CurrentCultureIgnoreCase)) if (content.viewGroup.Equals(PlexMediaType.Show.ToString(), StringComparison.CurrentCultureIgnoreCase))
{ {
@ -146,7 +184,7 @@ namespace Ombi.Schedule.Jobs.Plex
Logger.LogInformation("Processing TV Shows"); Logger.LogInformation("Processing TV Shows");
foreach (var show in content.Metadata ?? new Metadata[] { }) foreach (var show in content.Metadata ?? new Metadata[] { })
{ {
await ProcessTvShow(servers, show, contentToAdd, recentlyAddedSearch); await ProcessTvShow(servers, show, contentToAdd, recentlyAddedSearch, processedContent);
} }
} }
if (content.viewGroup.Equals(PlexMediaType.Movie.ToString(), StringComparison.CurrentCultureIgnoreCase)) if (content.viewGroup.Equals(PlexMediaType.Movie.ToString(), StringComparison.CurrentCultureIgnoreCase))
@ -216,6 +254,10 @@ namespace Ombi.Schedule.Jobs.Plex
if (contentToAdd.Count > 500) if (contentToAdd.Count > 500)
{ {
await Repo.AddRange(contentToAdd); await Repo.AddRange(contentToAdd);
foreach (var c in contentToAdd)
{
processedContent.Add(c.Id);
}
contentToAdd.Clear(); contentToAdd.Clear();
} }
} }
@ -223,6 +265,10 @@ namespace Ombi.Schedule.Jobs.Plex
if (contentToAdd.Count > 500) if (contentToAdd.Count > 500)
{ {
await Repo.AddRange(contentToAdd); await Repo.AddRange(contentToAdd);
foreach (var c in contentToAdd)
{
processedContent.Add(c.Id);
}
contentToAdd.Clear(); contentToAdd.Clear();
} }
} }
@ -230,10 +276,16 @@ namespace Ombi.Schedule.Jobs.Plex
if (contentToAdd.Any()) if (contentToAdd.Any())
{ {
await Repo.AddRange(contentToAdd); await Repo.AddRange(contentToAdd);
foreach (var c in contentToAdd)
{
processedContent.Add(c.Id);
}
} }
return processedContent;
} }
private async Task ProcessTvShow(PlexServers servers, Metadata show, HashSet<PlexServerContent> contentToAdd, bool recentlyAdded) private async Task ProcessTvShow(PlexServers servers, Metadata show, HashSet<PlexServerContent> contentToAdd, bool recentlyAdded, HashSet<int> contentProcessed)
{ {
var seasonList = await PlexApi.GetSeasons(servers.PlexAuthToken, servers.FullUri, var seasonList = await PlexApi.GetSeasons(servers.PlexAuthToken, servers.FullUri,
show.ratingKey); show.ratingKey);
@ -416,8 +468,17 @@ namespace Ombi.Schedule.Jobs.Plex
if (contentToAdd.Count > 500 || recentlyAdded) if (contentToAdd.Count > 500 || recentlyAdded)
{ {
await Repo.AddRange(contentToAdd); await Repo.AddRange(contentToAdd);
foreach (var plexServerContent in contentToAdd)
{
contentProcessed.Add(plexServerContent.Id);
}
contentToAdd.Clear(); contentToAdd.Clear();
} }
if (contentToAdd.Any())
{
await Repo.AddRange(contentToAdd);
}
} }
/// <summary> /// <summary>

@ -111,7 +111,7 @@ namespace Ombi.Schedule.Jobs.Plex
// 12.03.2017 - I think we should be able to match them now // 12.03.2017 - I think we should be able to match them now
//await _repo.ExecuteSql("DELETE FROM PlexEpisode"); //await _repo.ExecuteSql("DELETE FROM PlexEpisode");
await ProcessEpsiodes(episodes, currentEpisodes); await ProcessEpsiodes(episodes?.MediaContainer?.Metadata ?? new Metadata[] { }, currentEpisodes);
currentPosition += resultCount; currentPosition += resultCount;
while (currentPosition < episodes.MediaContainer.totalSize) while (currentPosition < episodes.MediaContainer.totalSize)
@ -119,7 +119,7 @@ namespace Ombi.Schedule.Jobs.Plex
var ep = await _api.GetAllEpisodes(settings.PlexAuthToken, settings.FullUri, section.key, currentPosition, var ep = await _api.GetAllEpisodes(settings.PlexAuthToken, settings.FullUri, section.key, currentPosition,
resultCount); resultCount);
await ProcessEpsiodes(ep, currentEpisodes); await ProcessEpsiodes(ep?.MediaContainer?.Metadata ?? new Metadata[] { }, currentEpisodes);
_log.LogInformation(LoggingEvents.PlexEpisodeCacher, $"Processed {resultCount} more episodes. Total Remaining {episodes.MediaContainer.totalSize - currentPosition}"); _log.LogInformation(LoggingEvents.PlexEpisodeCacher, $"Processed {resultCount} more episodes. Total Remaining {episodes.MediaContainer.totalSize - currentPosition}");
currentPosition += resultCount; currentPosition += resultCount;
} }
@ -129,12 +129,12 @@ namespace Ombi.Schedule.Jobs.Plex
await _repo.SaveChangesAsync(); await _repo.SaveChangesAsync();
} }
private async Task ProcessEpsiodes(PlexContainer episodes, IQueryable<PlexEpisode> currentEpisodes) public async Task ProcessEpsiodes(Metadata[] episodes, IQueryable<PlexEpisode> currentEpisodes)
{ {
var ep = new HashSet<PlexEpisode>(); var ep = new HashSet<PlexEpisode>();
try try
{ {
foreach (var episode in episodes?.MediaContainer?.Metadata ?? new Metadata[] { }) foreach (var episode in episodes)
{ {
// I don't think we need to get the metadata, we only need to get the metadata if we need the provider id (TheTvDbid). Why do we need it for episodes? // I don't think we need to get the metadata, we only need to get the metadata if we need the provider id (TheTvDbid). Why do we need it for episodes?
// We have the parent and grandparent rating keys to link up to the season and series // We have the parent and grandparent rating keys to link up to the season and series

@ -25,7 +25,7 @@ namespace Ombi.Settings.Settings.Models
} }
public static string PlexRecentlyAdded(JobSettings s) public static string PlexRecentlyAdded(JobSettings s)
{ {
return Get(s.PlexRecentlyAddedSync, Cron.Hourly(0)); return Get(s.PlexRecentlyAddedSync, Cron.MinuteInterval(30));
} }
public static string CouchPotato(JobSettings s) public static string CouchPotato(JobSettings s)
{ {
@ -50,7 +50,7 @@ namespace Ombi.Settings.Settings.Models
} }
public static string RefreshMetadata(JobSettings s) public static string RefreshMetadata(JobSettings s)
{ {
return Get(s.RefreshMetadata, Cron.DayInterval(2)); return Get(s.RefreshMetadata, Cron.DayInterval(3));
} }
private static string Get(string settings, string defaultCron) private static string Get(string settings, string defaultCron)

@ -191,11 +191,15 @@ namespace Ombi
} }
app.UseHangfireServer(new BackgroundJobServerOptions { WorkerCount = 1, ServerTimeout = TimeSpan.FromDays(1), ShutdownTimeout = TimeSpan.FromDays(1)}); app.UseHangfireServer(new BackgroundJobServerOptions { WorkerCount = 1, ServerTimeout = TimeSpan.FromDays(1), ShutdownTimeout = TimeSpan.FromDays(1)});
app.UseHangfireDashboard(settings.BaseUrl.HasValue() ? $"{settings.BaseUrl}/hangfire" : "/hangfire", if (env.IsDevelopment())
new DashboardOptions {
{ app.UseHangfireDashboard(settings.BaseUrl.HasValue() ? $"{settings.BaseUrl}/hangfire" : "/hangfire",
Authorization = new[] { new HangfireAuthorizationFilter() } new DashboardOptions
}); {
Authorization = new[] {new HangfireAuthorizationFilter()}
});
}
GlobalJobFilters.Filters.Add(new AutomaticRetryAttribute { Attempts = 3 }); GlobalJobFilters.Filters.Add(new AutomaticRetryAttribute { Attempts = 3 });
// Setup the scheduler // Setup the scheduler

Loading…
Cancel
Save