Fixed the exception being thown when reading from multiple IQuerables

pull/3598/head
Jamie Rees 4 years ago
parent 77f8d76590
commit bbf9c4ba51

@ -11,7 +11,7 @@ namespace Ombi.Helpers
public class OmbiQuartz
{
protected IScheduler _scheduler { get; set; }
public static IScheduler Scheduler => Instance._scheduler;
// Singleton
@ -31,14 +31,14 @@ namespace Ombi.Helpers
{
_scheduler = await new StdSchedulerFactory().GetScheduler();
}
public IScheduler UseJobFactory(IJobFactory jobFactory)
{
Scheduler.JobFactory = jobFactory;
return Scheduler;
}
public static async Task<bool> IsJobRunnung(string jobName)
public static async Task<bool> IsJobRunning(string jobName)
{
var running = await Scheduler.GetCurrentlyExecutingJobs();
return running.Any(x => x.JobDetail.Key.Name.Equals(jobName, StringComparison.InvariantCultureIgnoreCase));
@ -57,7 +57,7 @@ namespace Ombi.Helpers
}
}
if(!cronExpression.HasValue())
if (!cronExpression.HasValue())
{
jobBuilder.StoreDurably(true);
}
@ -67,23 +67,26 @@ namespace Ombi.Helpers
{
ITrigger jobTrigger = TriggerBuilder.Create()
.WithIdentity(name + "Trigger", group)
.WithCronSchedule(cronExpression,
.WithCronSchedule(cronExpression,
x => x.WithMisfireHandlingInstructionFireAndProceed())
.ForJob(name, group)
.StartNow()
.Build();
await Scheduler.ScheduleJob(job, jobTrigger);
}
}
else
{
await Scheduler.AddJob(job, true);
}
}
public static async Task TriggerJob(string jobName, string group)
{
await Scheduler.TriggerJob(new JobKey(jobName, group));
if (!(await IsJobRunning(jobName)))
{
await Scheduler.TriggerJob(new JobKey(jobName, group));
}
}
public static async Task TriggerJob(string jobName, string group, IDictionary<string, object> data)

@ -39,6 +39,7 @@ using Ombi.Helpers;
using Ombi.Store.Entities;
using Ombi.Store.Repository;
using Quartz;
using Ombi.Schedule.Jobs.Ombi;
namespace Ombi.Schedule.Jobs.Emby
{
@ -77,7 +78,7 @@ namespace Ombi.Schedule.Jobs.Emby
await _notification.Clients.Clients(NotificationHub.AdminConnectionIds)
.SendAsync(NotificationHub.NotificationEvent, "Emby Episode Sync Finished");
await OmbiQuartz.TriggerJob(nameof(IEmbyAvaliabilityChecker), "Emby");
await OmbiQuartz.TriggerJob(nameof(IRefreshMetadata), "System");
}
private async Task CacheEpisodes(EmbyServers server)

@ -61,12 +61,16 @@ namespace Ombi.Schedule.Jobs.Ombi
if (settings.Enable)
{
await StartPlex();
await OmbiQuartz.TriggerJob(nameof(IPlexAvailabilityChecker), "Plex");
}
var embySettings = await _embySettings.GetSettingsAsync();
if (embySettings.Enable)
{
await StartEmby(embySettings);
await OmbiQuartz.TriggerJob(nameof(IEmbyAvaliabilityChecker), "Emby");
}
}
catch (Exception e)
@ -85,8 +89,8 @@ namespace Ombi.Schedule.Jobs.Ombi
private async Task StartPlex()
{
// Ensure we check that we have not linked this item to a request
var allMovies = await _plexRepo.GetAll().Where(x =>
x.Type == PlexMediaTypeEntity.Movie && x.RequestId == null && (x.TheMovieDbId == null || x.ImdbId == null)).ToListAsync();
var allMovies = await _plexRepo.GetAll().Where(x =>
x.Type == PlexMediaTypeEntity.Movie && x.RequestId == null && (x.TheMovieDbId == null || x.ImdbId == null)).ToListAsync();
await StartPlexMovies(allMovies);
// Now Tv
@ -105,7 +109,7 @@ namespace Ombi.Schedule.Jobs.Ombi
private async Task StartPlexTv(List<PlexServerContent> allTv)
{
foreach (var show in allTv)
{
{
// Just double check there is no associated request id
if (show.RequestId.HasValue)
{

@ -60,15 +60,15 @@ namespace Ombi.Schedule.Jobs.Plex
.SendAsync(NotificationHub.NotificationEvent, "Plex Availability Check Finished");
}
private Task ProcessTv()
private async Task ProcessTv()
{
var tv = _tvRepo.GetChild().Where(x => !x.Available).AsNoTracking();
return ProcessTv(tv);
var tv = await _tvRepo.GetChild().Where(x => !x.Available).ToListAsync();
await ProcessTv(tv);
}
private async Task ProcessTv(IQueryable<ChildRequests> tv)
private async Task ProcessTv(List<ChildRequests> tv)
{
var plexEpisodes = _repo.GetAllEpisodes().Include(x => x.Series).AsNoTracking();
var plexEpisodes = _repo.GetAllEpisodes().Include(x => x.Series);
foreach (var child in tv)
{
@ -119,11 +119,11 @@ namespace Ombi.Schedule.Jobs.Plex
{
continue;
}
var foundEp = await seriesEpisodes.FirstOrDefaultAsync(
var foundEp = await seriesEpisodes.AnyAsync(
x => x.EpisodeNumber == episode.EpisodeNumber &&
x.SeasonNumber == episode.Season.SeasonNumber);
if (foundEp != null)
if (foundEp)
{
availableEpisode.Add(new AvailabilityModel
{
@ -135,18 +135,24 @@ namespace Ombi.Schedule.Jobs.Plex
}
//TODO Partial avilability notifications here
foreach(var c in availableEpisode)
if (availableEpisode.Any())
{
await _tvRepo.MarkEpisodeAsAvailable(c.Id);
await _tvRepo.Save();
}
//foreach(var c in availableEpisode)
//{
// await _tvRepo.MarkEpisodeAsAvailable(c.Id);
//}
// Check to see if all of the episodes in all seasons are available for this request
var allAvailable = child.SeasonRequests.All(x => x.Episodes.All(c => c.Available));
if (allAvailable)
{
child.Available = true;
child.MarkedAsAvailable = DateTime.UtcNow;
_log.LogInformation("[PAC] - Child request {0} is now available, sending notification", $"{child.Title} - {child.Id}");
// We have ful-fulled this request!
await _tvRepo.MarkChildAsAvailable(child.Id);
await _tvRepo.Save();
await _notificationService.Notify(new NotificationOptions
{
DateTime = DateTime.Now,
@ -164,7 +170,7 @@ namespace Ombi.Schedule.Jobs.Plex
private async Task ProcessMovies()
{
// Get all non available
var movies = _movieRepo.GetAll().Include(x => x.RequestedUser).Where(x => !x.Available).AsNoTracking();
var movies = _movieRepo.GetAll().Include(x => x.RequestedUser).Where(x => !x.Available);
var itemsForAvailbility = new List<AvailabilityModel>();
foreach (var movie in movies)
@ -191,8 +197,10 @@ namespace Ombi.Schedule.Jobs.Plex
// We don't yet have this
continue;
}
_log.LogInformation("[PAC] - Movie request {0} is now available, sending notification", $"{movie.Title} - {movie.Id}");
movie.Available = true;
movie.MarkedAsAvailable = DateTime.UtcNow;
itemsForAvailbility.Add(new AvailabilityModel
{
Id = movie.Id,
@ -200,9 +208,12 @@ namespace Ombi.Schedule.Jobs.Plex
});
}
if (itemsForAvailbility.Any())
{
await _movieRepo.SaveChangesAsync();
}
foreach (var i in itemsForAvailbility)
{
await _movieRepo.MarkAsAvailable(i.Id);
await _notificationService.Notify(new NotificationOptions
{
@ -214,7 +225,7 @@ namespace Ombi.Schedule.Jobs.Plex
});
}
await _repo.SaveChangesAsync();
//await _repo.SaveChangesAsync();
}
private bool _disposed;

@ -116,7 +116,7 @@ namespace Ombi.Schedule.Jobs.Plex
if ((processedContent?.HasProcessedContent ?? false) && recentlyAddedSearch)
{
// Ensure it's not already running
if (await OmbiQuartz.IsJobRunnung(nameof(IPlexAvailabilityChecker)))
if (await OmbiQuartz.IsJobRunning(nameof(IPlexAvailabilityChecker)))
{
Logger.LogInformation("Availability checker already running");
}
@ -131,7 +131,7 @@ namespace Ombi.Schedule.Jobs.Plex
if ((processedContent?.HasProcessedContent ?? false) && recentlyAddedSearch)
{
// Ensure it's not already running
if (await OmbiQuartz.IsJobRunnung(nameof(IPlexAvailabilityChecker)))
if (await OmbiQuartz.IsJobRunning(nameof(IPlexAvailabilityChecker)))
{
Logger.LogInformation("Availability checker already running");
}

@ -63,16 +63,9 @@ namespace Ombi.Schedule.Jobs.Plex
_log.LogError(LoggingEvents.Cacher, e, "Caching Episodes Failed");
}
await OmbiQuartz.TriggerJob(nameof(IRefreshMetadata), "System");
// Ensure it's not already running
if (await OmbiQuartz.IsJobRunnung(nameof(IPlexAvailabilityChecker)))
{
_log.LogInformation("Availability checker already running");
}
else
{
await OmbiQuartz.TriggerJob(nameof(IPlexAvailabilityChecker), "Plex");
}
await _notification.Clients.Clients(NotificationHub.AdminConnectionIds)
.SendAsync(NotificationHub.NotificationEvent, "Plex Episode Sync Finished");
}

@ -85,25 +85,8 @@ namespace Ombi.Store.Repository
protected async Task<int> InternalSaveChanges()
{
var policy = Policy
.Handle<SqliteException>()
.WaitAndRetryAsync(new[]
{
TimeSpan.FromSeconds(1),
TimeSpan.FromSeconds(5),
TimeSpan.FromSeconds(10)
});
var result = await policy.ExecuteAndCaptureAsync(async () =>
{
//using (var tran = await _ctx.Database.BeginTransactionAsync())
{
var r = await _ctx.SaveChangesAsync();
//tran.Commit();
return r;
}
});
return result.Result;
var r = await _ctx.SaveChangesAsync();
return r;
}

Loading…
Cancel
Save