pull/4991/merge
sephrat 1 month ago committed by GitHub
commit 289db693ad
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -24,6 +24,7 @@ namespace Ombi.Api.Plex
Task<PlexUsers> GetUsers(string authToken);
Task<PlexAccount> GetAccount(string authToken);
Task<PlexMetadata> GetRecentlyAdded(string authToken, string uri, string sectionId);
Task<PlexMetadata> GetPlayed(string authToken, string uri, string sectionId, int maxNumberOfItems = 0);
Task<OAuthContainer> GetPin(int pinId);
Task<Uri> GetOAuthUrl(string code, string applicationUrl);
Task<PlexAddWrapper> AddUser(string emailAddress, string serverId, string authToken, int[] libs);

@ -0,0 +1,9 @@
namespace Ombi.Api.Plex.Models
{
public enum PlexMediaFilterType
{
Movie = 1,
Show = 2,
Episode = 4,
}
}

@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Http;
using System.Threading;
@ -212,6 +213,29 @@ namespace Ombi.Api.Plex
return await Api.Request<PlexMetadata>(request);
}
public async Task<PlexMetadata> GetPlayed(string authToken, string uri, string sectionId, int maxNumberOfItems)
{
var request = new Request($"library/sections/{sectionId}/all", uri, HttpMethod.Get);
await AddHeaders(request, authToken);
request.AddQueryString("unwatched", "0");
request.AddQueryString("sort", "lastViewedAt:desc");
// for some reason, we need to explicitely include episodes for them to be returned by the API (movies are fine)
// also the order seems of importance: "4,1" doesn't work but "1,4" does work
var types = new List<int> { (int) PlexMediaFilterType.Movie, (int) PlexMediaFilterType.Episode };
var typeFilter = string.Join(",", types);
request.AddQueryString("type", typeFilter);
if (maxNumberOfItems != 0)
{
AddLimitHeaders(request, 0, maxNumberOfItems);
}
return await Api.Request<PlexMetadata>(request);
}
public async Task<OAuthContainer> GetPin(int pinId)
{
var request = new Request($"api/v2/pins/{pinId}", "https://plex.tv/", HttpMethod.Get);

@ -30,7 +30,7 @@ namespace Ombi.Core.Tests.Authentication
AuthenticationSettings.Setup(x => x.GetSettingsAsync())
.ReturnsAsync(new AuthenticationSettings());
_um = new OmbiUserManager(UserStore.Object, null, null, null, null, null, null, null, null,
PlexApi.Object, null, null, null, null, AuthenticationSettings.Object);
PlexApi.Object, null, null, null, null, AuthenticationSettings.Object, null);
}
public OmbiUserManager _um { get; set; }

@ -27,6 +27,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Identity;
using Microsoft.EntityFrameworkCore;
@ -41,6 +42,7 @@ using Ombi.Core.Settings.Models.External;
using Ombi.Helpers;
using Ombi.Settings.Settings.Models;
using Ombi.Store.Entities;
using Ombi.Store.Repository;
namespace Ombi.Core.Authentication
{
@ -52,7 +54,7 @@ namespace Ombi.Core.Authentication
IdentityErrorDescriber errors, IServiceProvider services, ILogger<UserManager<OmbiUser>> logger, IPlexApi plexApi,
IEmbyApiFactory embyApi, ISettingsService<EmbySettings> embySettings,
IJellyfinApiFactory jellyfinApi, ISettingsService<JellyfinSettings> jellyfinSettings,
ISettingsService<AuthenticationSettings> auth)
ISettingsService<AuthenticationSettings> auth, IRepository<PlexWatchlistUserError> userError)
: base(store, optionsAccessor, passwordHasher, userValidators, passwordValidators, keyNormalizer, errors, services, logger)
{
_plexApi = plexApi;
@ -61,6 +63,8 @@ namespace Ombi.Core.Authentication
_embySettings = embySettings;
_jellyfinSettings = jellyfinSettings;
_authSettings = auth;
_userError = userError;
_logger = logger;
}
private readonly IPlexApi _plexApi;
@ -69,6 +73,8 @@ namespace Ombi.Core.Authentication
private readonly ISettingsService<EmbySettings> _embySettings;
private readonly ISettingsService<JellyfinSettings> _jellyfinSettings;
private readonly ISettingsService<AuthenticationSettings> _authSettings;
private readonly IRepository<PlexWatchlistUserError> _userError;
private readonly Microsoft.Extensions.Logging.ILogger _logger;
private string _clientIpAddress;
public string ClientIpAddress { get => _clientIpAddress; set => _clientIpAddress = value; }
@ -139,6 +145,38 @@ namespace Ombi.Core.Authentication
}
public async Task<ICollection<OmbiUser>> GetPlexUsersWithValidTokens()
{
var plexUsersWithTokens = Users.Where(x => x.UserType == UserType.PlexUser && x.MediaServerToken != null).ToList();
_logger.LogDebug($"Found {plexUsersWithTokens.Count} users with tokens");
var result = new List<OmbiUser>();
foreach (var user in plexUsersWithTokens)
{
// Check if the user has errors and the token is the same (not refreshed)
var failedUser = await _userError.GetAll().Where(x => x.UserId == user.Id).FirstOrDefaultAsync();
if (failedUser != null)
{
if (failedUser.MediaServerToken.Equals(user.MediaServerToken))
{
_logger.LogWarning($"Skipping user '{user.UserName}' as they failed previously and the token has not yet been refreshed. They need to re-authenticate with Ombi");
continue;
}
else
{
// remove that guy
await _userError.Delete(failedUser);
failedUser = null;
}
}
if (failedUser == null)
{
result.Add(user);
}
}
return result;
}
/// <summary>
/// Sign the user into plex and make sure we can get the authentication token.

@ -244,6 +244,7 @@ namespace Ombi.DependencyInjection
services.AddSingleton<IJobFactory, IoCJobFactory>();
services.AddTransient<IPlexContentSync, PlexContentSync>();
services.AddTransient<IPlexPlayedSync, PlexPlayedSync>();
services.AddTransient<IPlexWatchlistImport, PlexWatchlistImport>();
services.AddTransient<IEmbyContentSync, EmbyContentSync>();
services.AddTransient<IEmbyPlayedSync, EmbyPlayedSync>();

@ -0,0 +1,8 @@
using Quartz;
namespace Ombi.Schedule.Jobs
{
public interface IPlexPlayedSync : IJob
{
}
}

@ -35,82 +35,56 @@ using Ombi.Api.Plex;
using Ombi.Api.Plex.Models;
using Ombi.Api.TheMovieDb;
using Ombi.Api.TheMovieDb.Models;
using Ombi.Core.Services;
using Ombi.Core.Settings;
using Ombi.Core.Settings.Models.External;
using Ombi.Helpers;
using Ombi.Hubs;
using Ombi.Schedule.Jobs.Plex.Interfaces;
using Ombi.Schedule.Jobs.Plex.Models;
using Ombi.Settings.Settings.Models;
using Ombi.Store.Entities;
using Ombi.Store.Repository;
using Quartz;
namespace Ombi.Schedule.Jobs.Plex
{
public class PlexContentSync : IPlexContentSync
public class PlexContentSync : PlexLibrarySync, IPlexContentSync
{
private readonly IMovieDbApi _movieApi;
private readonly IMediaCacheService _mediaCacheService;
public PlexContentSync(ISettingsService<PlexSettings> plex, IPlexApi plexApi, ILogger<PlexContentSync> logger, IPlexContentRepository repo,
IPlexEpisodeSync epsiodeSync, INotificationHubService notificationHubService, IMovieDbApi movieDbApi, IMediaCacheService mediaCacheService)
private readonly IFeatureService _feature;
private ProcessedContent _processedContent;
public PlexContentSync(
ISettingsService<PlexSettings> plex,
IPlexApi plexApi, ILogger<PlexLibrarySync> logger,
IPlexContentRepository repo,
IPlexEpisodeSync epsiodeSync,
INotificationHubService notificationHubService,
IMovieDbApi movieDbApi,
IMediaCacheService mediaCacheService,
IFeatureService feature):
base(plex, plexApi, logger, notificationHubService)
{
Plex = plex;
PlexApi = plexApi;
Logger = logger;
Repo = repo;
EpisodeSync = epsiodeSync;
Notification = notificationHubService;
_movieApi = movieDbApi;
_mediaCacheService = mediaCacheService;
_feature = feature;
Plex.ClearCache();
}
private ISettingsService<PlexSettings> Plex { get; }
private IPlexApi PlexApi { get; }
private ILogger<PlexContentSync> Logger { get; }
private IPlexContentRepository Repo { get; }
private IPlexEpisodeSync EpisodeSync { get; }
private INotificationHubService Notification { get; set; }
public async Task Execute(IJobExecutionContext context)
public async override Task Execute(IJobExecutionContext context)
{
JobDataMap dataMap = context.JobDetail.JobDataMap;
var recentlyAddedSearch = dataMap.GetBooleanValueFromString(JobDataKeys.RecentlyAddedSearch);
var plexSettings = await Plex.GetSettingsAsync();
if (!plexSettings.Enable)
{
return;
}
await NotifyClient(recentlyAddedSearch ? "Plex Recently Added Sync Started" : "Plex Content Sync Started");
if (!ValidateSettings(plexSettings))
{
Logger.LogError("Plex Settings are not valid");
await NotifyClient(recentlyAddedSearch ? "Plex Recently Added Sync, Settings Not Valid" : "Plex Content, Settings Not Valid");
return;
}
var processedContent = new ProcessedContent();
Logger.LogInformation(recentlyAddedSearch
? "Starting Plex Content Cacher Recently Added Scan"
: "Starting Plex Content Cacher");
try
{
if (recentlyAddedSearch)
{
processedContent = await StartTheCache(plexSettings, true);
}
else
{
await StartTheCache(plexSettings, false);
}
}
catch (Exception e)
{
await NotifyClient(recentlyAddedSearch ? "Plex Recently Added Sync Errored" : "Plex Content Sync Errored");
Logger.LogWarning(LoggingEvents.PlexContentCacher, e, "Exception thrown when attempting to cache the Plex Content");
}
_processedContent = new ProcessedContent();
await base.Execute(context);
if (!recentlyAddedSearch)
{
await NotifyClient("Plex Sync - Starting Episode Sync");
@ -118,53 +92,32 @@ namespace Ombi.Schedule.Jobs.Plex
await OmbiQuartz.TriggerJob(nameof(IPlexEpisodeSync), "Plex");
}
if ((processedContent?.HasProcessedContent ?? false) && recentlyAddedSearch)
if ((_processedContent?.HasProcessedContent ?? false) && recentlyAddedSearch)
{
await NotifyClient("Plex Sync - Checking if any requests are now available");
Logger.LogInformation("Kicking off Plex Availability Checker");
await OmbiQuartz.TriggerJob(nameof(IPlexAvailabilityChecker), "Plex");
}
var processedCont = processedContent?.Content?.Count() ?? 0;
var processedEp = processedContent?.Episodes?.Count() ?? 0;
var processedCont = _processedContent?.Content?.Count() ?? 0;
var processedEp = _processedContent?.Episodes?.Count() ?? 0;
Logger.LogInformation("Finished Plex Content Cacher, with processed content: {0}, episodes: {1}. Recently Added Scan: {2}", processedCont, processedEp, recentlyAddedSearch);
await NotifyClient(recentlyAddedSearch ? $"Plex Recently Added Sync Finished, We processed {processedCont}, and {processedEp} Episodes" : "Plex Content Sync Finished");
await _mediaCacheService.Purge();
}
private async Task<ProcessedContent> StartTheCache(PlexSettings plexSettings, bool recentlyAddedSearch)
{
var processedContent = new ProcessedContent();
foreach (var servers in plexSettings.Servers ?? new List<PlexServers>())
// Played state
var isPlayedSyncEnabled = await _feature.FeatureEnabled(FeatureNames.PlayedSync);
if(isPlayedSyncEnabled)
{
try
{
Logger.LogInformation("Starting to cache the content on server {0}", servers.Name);
if (recentlyAddedSearch)
{
// If it's recently added search then we want the results to pass to the metadata job
// This way the metadata job is smaller in size to process, it only need to look at newly added shit
return await ProcessServer(servers, true);
}
else
{
await ProcessServer(servers, false);
}
}
catch (Exception e)
{
Logger.LogWarning(LoggingEvents.PlexContentCacher, e, "Exception thrown when attempting to cache the Plex Content in server {0}", servers.Name);
}
await OmbiQuartz.Scheduler.TriggerJob(new JobKey(nameof(IPlexPlayedSync), "Plex"), new JobDataMap(new Dictionary<string, string> { { JobDataKeys.RecentlyAddedSearch, recentlyAddedSearch.ToString() } }));
}
return processedContent;
await _mediaCacheService.Purge();
}
private async Task<ProcessedContent> ProcessServer(PlexServers servers, bool recentlyAddedSearch)
protected override async Task ProcessServer(PlexServers servers)
{
var retVal = new ProcessedContent();
var contentProcessed = new Dictionary<int, string>();
var episodesProcessed = new List<int>();
Logger.LogDebug("Getting all content from server {0}", servers.Name);
@ -286,9 +239,8 @@ namespace Ombi.Schedule.Jobs.Plex
}
}
retVal.Content = contentProcessed.Values;
retVal.Episodes = episodesProcessed;
return retVal;
_processedContent.Content = contentProcessed.Values;
_processedContent.Episodes = episodesProcessed;
}
public async Task MovieLoop(PlexServers servers, Mediacontainer content, HashSet<PlexServerContent> contentToAdd,
@ -697,45 +649,27 @@ namespace Ombi.Schedule.Jobs.Plex
/// <returns></returns>
private async Task<List<Mediacontainer>> GetAllContent(PlexServers plexSettings, bool recentlyAddedSearch)
{
var sections = await PlexApi.GetLibrarySections(plexSettings.PlexAuthToken, plexSettings.FullUri);
var libs = new List<Mediacontainer>();
if (sections != null)
var directories = await GetEnabledLibraries(plexSettings);
foreach (var directory in directories)
{
foreach (var dir in sections.MediaContainer.Directory ?? new List<Directory>())
if (recentlyAddedSearch)
{
if (plexSettings.PlexSelectedLibraries.Any())
var container = await PlexApi.GetRecentlyAdded(plexSettings.PlexAuthToken, plexSettings.FullUri,
directory.key);
if (container != null)
{
if (plexSettings.PlexSelectedLibraries.Any(x => x.Enabled))
{
// Only get the enabled libs
var keys = plexSettings.PlexSelectedLibraries.Where(x => x.Enabled)
.Select(x => x.Key.ToString()).ToList();
if (!keys.Contains(dir.key))
{
Logger.LogDebug("Lib {0} is not monitored, so skipping", dir.key);
// We are not monitoring this lib
continue;
}
}
}
if (recentlyAddedSearch)
{
var container = await PlexApi.GetRecentlyAdded(plexSettings.PlexAuthToken, plexSettings.FullUri,
dir.key);
if (container != null)
{
libs.Add(container.MediaContainer);
}
libs.Add(container.MediaContainer);
}
else
}
else
{
var lib = await PlexApi.GetLibrary(plexSettings.PlexAuthToken, plexSettings.FullUri, directory.key);
if (lib != null)
{
var lib = await PlexApi.GetLibrary(plexSettings.PlexAuthToken, plexSettings.FullUri, dir.key);
if (lib != null)
{
libs.Add(lib.MediaContainer);
}
libs.Add(lib.MediaContainer);
}
}
}
@ -743,25 +677,6 @@ namespace Ombi.Schedule.Jobs.Plex
return libs;
}
private async Task NotifyClient(string message)
{
await Notification.SendNotificationToAdmins($"Plex Sync - {message}");
}
private static bool ValidateSettings(PlexSettings plex)
{
if (plex.Enable)
{
foreach (var server in plex.Servers ?? new List<PlexServers>())
{
if (string.IsNullOrEmpty(server?.Ip) || string.IsNullOrEmpty(server?.PlexAuthToken))
{
return false;
}
}
}
return plex.Enable;
}
private bool _disposed;

@ -0,0 +1,163 @@
#region Copyright
// /************************************************************************
// Copyright (c) 2017 Jamie Rees
// File: PlexServerContentCacher.cs
// Created By: Jamie Rees
//
// Permission is hereby granted, free of charge, to any person obtaining
// a copy of this software and associated documentation files (the
// "Software"), to deal in the Software without restriction, including
// without limitation the rights to use, copy, modify, merge, publish,
// distribute, sublicense, and/or sell copies of the Software, and to
// permit persons to whom the Software is furnished to do so, subject to
// the following conditions:
//
// The above copyright notice and this permission notice shall be
// included in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
// NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
// LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
// WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
// ************************************************************************/
#endregion
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Ombi.Api.Plex;
using Ombi.Api.Plex.Models;
using Ombi.Core.Settings;
using Ombi.Core.Settings.Models.External;
using Ombi.Helpers;
using Ombi.Hubs;
using Quartz;
namespace Ombi.Schedule.Jobs.Plex
{
public abstract class PlexLibrarySync
{
public PlexLibrarySync(
ISettingsService<PlexSettings> plex,
IPlexApi plexApi,
ILogger<PlexLibrarySync> logger,
INotificationHubService notificationHubService)
{
PlexApi = plexApi;
Plex = plex;
Logger = logger;
Notification = notificationHubService;
}
protected ILogger<PlexLibrarySync> Logger { get; }
protected IPlexApi PlexApi { get; }
protected ISettingsService<PlexSettings> Plex { get; }
private INotificationHubService Notification { get; set; }
protected bool recentlyAddedSearch;
public virtual async Task Execute(IJobExecutionContext context)
{
JobDataMap dataMap = context.MergedJobDataMap;
recentlyAddedSearch = dataMap.GetBooleanValueFromString(JobDataKeys.RecentlyAddedSearch);
var plexSettings = await Plex.GetSettingsAsync();
if (!plexSettings.Enable)
{
return;
}
await NotifyClient(recentlyAddedSearch ? "Plex Recently Added Sync Started" : "Plex Content Sync Started");
if (!ValidateSettings(plexSettings))
{
Logger.LogError("Plex Settings are not valid");
await NotifyClient(recentlyAddedSearch ? "Plex Recently Added Sync, Settings Not Valid" : "Plex Content, Settings Not Valid");
return;
}
Logger.LogInformation(recentlyAddedSearch
? "Starting Plex Content Cacher Recently Added Scan"
: "Starting Plex Content Cacher");
try
{
await StartTheCache(plexSettings);
}
catch (Exception e)
{
await NotifyClient(recentlyAddedSearch ? "Plex Recently Added Sync Errored" : "Plex Content Sync Errored");
Logger.LogWarning(LoggingEvents.PlexContentCacher, e, "Exception thrown when attempting to cache the Plex Content");
}
}
private async Task StartTheCache(PlexSettings plexSettings)
{
foreach (var servers in plexSettings.Servers ?? new List<PlexServers>())
{
try
{
Logger.LogInformation("Starting to cache the content on server {0}", servers.Name);
await ProcessServer(servers);
}
catch (Exception e)
{
Logger.LogWarning(LoggingEvents.PlexContentCacher, e, "Exception thrown when attempting to cache the Plex Content in server {0}", servers.Name);
}
}
}
protected async Task<List<Directory>> GetEnabledLibraries(PlexServers plexSettings)
{
var result = new List<Directory>();
var sections = await PlexApi.GetLibrarySections(plexSettings.PlexAuthToken, plexSettings.FullUri);
if (sections != null)
{
foreach (var dir in sections.MediaContainer.Directory ?? new List<Directory>())
{
if (plexSettings.PlexSelectedLibraries.Any())
{
if (plexSettings.PlexSelectedLibraries.Any(x => x.Enabled))
{
// Only get the enabled libs
var keys = plexSettings.PlexSelectedLibraries.Where(x => x.Enabled)
.Select(x => x.Key.ToString()).ToList();
if (!keys.Contains(dir.key))
{
Logger.LogDebug("Lib {0} is not monitored, so skipping", dir.key);
// We are not monitoring this lib
continue;
}
}
}
result.Add(dir);
}
}
return result;
}
protected abstract Task ProcessServer(PlexServers servers);
protected async Task NotifyClient(string message)
{
await Notification.SendNotificationToAdmins($"Plex Sync - {message}");
}
private static bool ValidateSettings(PlexSettings plex)
{
if (plex.Enable)
{
foreach (var server in plex.Servers ?? new List<PlexServers>())
{
if (string.IsNullOrEmpty(server?.Ip) || string.IsNullOrEmpty(server?.PlexAuthToken))
{
return false;
}
}
}
return plex.Enable;
}
}
}

@ -0,0 +1,199 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Ombi.Api.Plex;
using Ombi.Api.Plex.Models;
using Ombi.Core.Authentication;
using Ombi.Core.Settings;
using Ombi.Core.Settings.Models.External;
using Ombi.Helpers;
using Ombi.Hubs;
using Ombi.Store.Entities;
using Ombi.Store.Repository;
namespace Ombi.Schedule.Jobs.Plex
{
public class PlexPlayedSync : PlexLibrarySync, IPlexPlayedSync
{
public PlexPlayedSync(
ISettingsService<PlexSettings> plex,
IPlexApi plexApi,
ILogger<PlexLibrarySync> logger,
IPlexContentRepository contentRepo,
INotificationHubService notificationHubService,
OmbiUserManager user,
IUserPlayedMovieRepository movieRepo,
IUserPlayedEpisodeRepository episodeRepo):
base(plex, plexApi, logger, notificationHubService)
{
_contentRepo = contentRepo;
_userManager = user;
_movieRepo = movieRepo;
_episodeRepo = episodeRepo;
Plex.ClearCache();
}
private IPlexContentRepository _contentRepo { get; }
private OmbiUserManager _userManager { get; }
private readonly IUserPlayedMovieRepository _movieRepo;
private readonly IUserPlayedEpisodeRepository _episodeRepo;
private const int recentlyAddedAmountToTake = 5;
protected override async Task ProcessServer(PlexServers servers)
{
var allUsers = await _userManager.GetPlexUsersWithValidTokens();
foreach (var user in allUsers)
{
await ProcessUser(servers, recentlyAddedSearch, user);
}
}
private async Task ProcessUser(PlexServers servers, bool recentlyAddedSearch, OmbiUser user)
{
var contentProcessed = new Dictionary<int, string>();
var episodesProcessed = new List<int>();
Logger.LogDebug($"Getting all played content from server {servers.Name} for user {user.Alias}");
var allContent = await GetAllContent(servers, recentlyAddedSearch, user);
Logger.LogDebug("We found {0} items", allContent.Count);
// Let's now process this.
var episodesToAdd = new HashSet<UserPlayedEpisode>();
var moviesToAdd = new HashSet<UserPlayedMovie>();
foreach (var content in allContent.OrderByDescending(x => x.viewGroup))
{
Logger.LogDebug($"Got type '{content.viewGroup}' to process");
if (content.viewGroup.Equals(PlexMediaType.Show.ToString(), StringComparison.InvariantCultureIgnoreCase))
{
foreach (var epInfo in content.Metadata ?? new Metadata[] { })
{
await ProcessEpisode(epInfo, user, episodesToAdd);
}
}
if (content.viewGroup.Equals(PlexMediaType.Movie.ToString(), StringComparison.InvariantCultureIgnoreCase))
{
Logger.LogDebug("Processing Movies");
foreach (var movie in content?.Metadata ?? Array.Empty<Metadata>())
{
await ProcessMovie(movie, user, moviesToAdd);
}
}
}
await _movieRepo.AddRange(moviesToAdd);
await _episodeRepo.AddRange(episodesToAdd);
}
private async Task ProcessEpisode(Metadata epInfo, OmbiUser user, ICollection<UserPlayedEpisode> content)
{
var episode = await _contentRepo.GetEpisodeByKey(epInfo.ratingKey);
if (episode == null || episode.Series == null)
{
Logger.LogInformation($"The episode {epInfo.title} does not relate to a series, so we cannot save this");
return;
}
if (episode.Series.TheMovieDbId.IsNullOrEmpty())
{
Logger.LogWarning($"Episode {epInfo.title} is not linked to a TMDB series. Skipping.");
return;
}
await AddToContent(content, new UserPlayedEpisode()
{
TheMovieDbId = int.Parse(episode.Series.TheMovieDbId),
SeasonNumber = episode.SeasonNumber,
EpisodeNumber = episode.EpisodeNumber,
UserId = user.Id
});
}
private async Task AddToContent(ICollection<UserPlayedEpisode> content, UserPlayedEpisode episode)
{
// Check if it exists
var existingEpisode = await _episodeRepo.Get(episode.TheMovieDbId, episode.SeasonNumber, episode.EpisodeNumber, episode.UserId);
var alreadyGoingToAdd = content.Any(x =>
x.TheMovieDbId == episode.TheMovieDbId
&& x.SeasonNumber == episode.SeasonNumber
&& x.EpisodeNumber == episode.EpisodeNumber
&& x.UserId == episode.UserId);
if (existingEpisode == null && !alreadyGoingToAdd)
{
content.Add(episode);
}
}
public async Task ProcessMovie(Metadata movie, OmbiUser user, ICollection<UserPlayedMovie> content)
{
var cachedMovie = await _contentRepo.GetByKey(movie.ratingKey);
if (cachedMovie == null || cachedMovie.TheMovieDbId.IsNullOrEmpty() )
{
Logger.LogWarning($"Movie {movie.title} has no relevant metadata. Skipping.");
return;
}
var userPlayedMovie = new UserPlayedMovie()
{
TheMovieDbId = int.Parse(cachedMovie.TheMovieDbId),
UserId = user.Id
};
// Check if it exists
var existingMovie = await _movieRepo.Get(userPlayedMovie.TheMovieDbId, userPlayedMovie.UserId);
var alreadyGoingToAdd = content.Any(x => x.TheMovieDbId == userPlayedMovie.TheMovieDbId && x.UserId == userPlayedMovie.UserId);
if (existingMovie == null && !alreadyGoingToAdd)
{
content.Add(userPlayedMovie);
}
}
private async Task<List<Mediacontainer>> GetAllContent(PlexServers plexSettings, bool recentlyAddedSearch, OmbiUser user)
{
var libs = new List<Mediacontainer>();
var directories = await GetEnabledLibraries(plexSettings);
foreach (var directory in directories)
{
var maxNumberOfItems = 0;
if (recentlyAddedSearch)
{
maxNumberOfItems = recentlyAddedAmountToTake;
}
var container = await PlexApi.GetPlayed(user.MediaServerToken, plexSettings.FullUri,
directory.key, maxNumberOfItems);
if (container != null)
{
libs.Add(container.MediaContainer);
}
}
return libs;
}
private bool _disposed;
protected virtual void Dispose(bool disposing)
{
if (_disposed)
return;
_disposed = true;
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
}
}

@ -64,30 +64,12 @@ namespace Ombi.Schedule.Jobs.Plex
return;
}
var plexUsersWithTokens = _ombiUserManager.Users.Where(x => x.UserType == UserType.PlexUser && x.MediaServerToken != null).ToList();
_logger.LogInformation($"Found {plexUsersWithTokens.Count} users with tokens");
await NotifyClient("Starting Watchlist Import");
foreach (var user in plexUsersWithTokens)
var plexUsers = await _ombiUserManager.GetPlexUsersWithValidTokens();
foreach (var user in plexUsers)
{
try
{
// Check if the user has errors and the token is the same (not refreshed)
var failedUser = await _userError.GetAll().Where(x => x.UserId == user.Id).FirstOrDefaultAsync();
if (failedUser != null)
{
if (failedUser.MediaServerToken.Equals(user.MediaServerToken))
{
_logger.LogInformation($"Skipping Plex Watchlist Import for user '{user.UserName}' as they failed previously and the token has not yet been refreshed");
continue;
}
else
{
// remove that guy
await _userError.Delete(failedUser);
failedUser = null;
}
}
_logger.LogDebug($"Starting Watchlist Import for {user.UserName} with token {user.MediaServerToken}");
var watchlist = await _plexApi.GetWatchlist(user.MediaServerToken, context?.CancellationToken ?? CancellationToken.None);

@ -90,6 +90,7 @@ namespace Ombi.Schedule
await OmbiQuartz.Instance.AddJob<IPlexContentSync>(nameof(IPlexContentSync) + "RecentlyAdded", "Plex", JobSettingsHelper.PlexRecentlyAdded(s), new Dictionary<string, string> { { JobDataKeys.RecentlyAddedSearch, "true" } });
await OmbiQuartz.Instance.AddJob<IPlexUserImporter>(nameof(IPlexUserImporter), "Plex", JobSettingsHelper.UserImporter(s));
await OmbiQuartz.Instance.AddJob<IPlexEpisodeSync>(nameof(IPlexEpisodeSync), "Plex", null);
await OmbiQuartz.Instance.AddJob<IPlexPlayedSync>(nameof(IPlexPlayedSync), "Plex", null);
await OmbiQuartz.Instance.AddJob<IPlexAvailabilityChecker>(nameof(IPlexAvailabilityChecker), "Plex", null);
await OmbiQuartz.Instance.AddJob<IPlexWatchlistImport>(nameof(IPlexWatchlistImport), "Plex", JobSettingsHelper.PlexWatchlistImport(s));
}

@ -156,7 +156,7 @@ namespace Ombi.Store.Repository
public async Task<PlexEpisode> GetEpisodeByKey(string key)
{
return await Db.PlexEpisode.FirstOrDefaultAsync(x => x.Key == key);
return await Db.PlexEpisode.Include(x => x.Series).FirstOrDefaultAsync(x => x.Key == key);
}
public override async Task AddRange(IEnumerable<IMediaServerEpisode> content)
{

Loading…
Cancel
Save