#pragma warning disable CS1591 using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using MediaBrowser.Common.Configuration; using MediaBrowser.Model.Events; using MediaBrowser.Model.IO; using MediaBrowser.Model.Serialization; using MediaBrowser.Model.Tasks; using Microsoft.Extensions.Logging; namespace Emby.Server.Implementations.ScheduledTasks { /// /// Class TaskManager. /// public class TaskManager : ITaskManager { public event EventHandler> TaskExecuting; public event EventHandler TaskCompleted; /// /// Gets the list of Scheduled Tasks. /// /// The scheduled tasks. public IScheduledTaskWorker[] ScheduledTasks { get; private set; } /// /// The _task queue. /// private readonly ConcurrentQueue> _taskQueue = new ConcurrentQueue>(); private readonly IJsonSerializer _jsonSerializer; private readonly IApplicationPaths _applicationPaths; private readonly ILogger _logger; private readonly IFileSystem _fileSystem; /// /// Initializes a new instance of the class. /// /// The application paths. /// The json serializer. /// The logger. /// The filesystem manager. public TaskManager( IApplicationPaths applicationPaths, IJsonSerializer jsonSerializer, ILogger logger, IFileSystem fileSystem) { _applicationPaths = applicationPaths; _jsonSerializer = jsonSerializer; _logger = logger; _fileSystem = fileSystem; ScheduledTasks = Array.Empty(); } /// /// Cancels if running and queue. /// /// /// Task options. public void CancelIfRunningAndQueue(TaskOptions options) where T : IScheduledTask { var task = ScheduledTasks.First(t => t.ScheduledTask.GetType() == typeof(T)); ((ScheduledTaskWorker)task).CancelIfRunning(); QueueScheduledTask(options); } public void CancelIfRunningAndQueue() where T : IScheduledTask { CancelIfRunningAndQueue(new TaskOptions()); } /// /// Cancels if running. /// /// public void CancelIfRunning() where T : IScheduledTask { var task = ScheduledTasks.First(t => t.ScheduledTask.GetType() == typeof(T)); ((ScheduledTaskWorker)task).CancelIfRunning(); } /// /// Queues the scheduled task. /// /// /// Task options public void QueueScheduledTask(TaskOptions options) where T : IScheduledTask { var scheduledTask = ScheduledTasks.FirstOrDefault(t => t.ScheduledTask.GetType() == typeof(T)); if (scheduledTask == null) { _logger.LogError("Unable to find scheduled task of type {0} in QueueScheduledTask.", typeof(T).Name); } else { QueueScheduledTask(scheduledTask, options); } } public void QueueScheduledTask() where T : IScheduledTask { QueueScheduledTask(new TaskOptions()); } public void QueueIfNotRunning() where T : IScheduledTask { var task = ScheduledTasks.First(t => t.ScheduledTask.GetType() == typeof(T)); if (task.State != TaskState.Running) { QueueScheduledTask(new TaskOptions()); } } public void Execute() where T : IScheduledTask { var scheduledTask = ScheduledTasks.FirstOrDefault(t => t.ScheduledTask.GetType() == typeof(T)); if (scheduledTask == null) { _logger.LogError("Unable to find scheduled task of type {0} in Execute.", typeof(T).Name); } else { var type = scheduledTask.ScheduledTask.GetType(); _logger.LogInformation("Queueing task {0}", type.Name); lock (_taskQueue) { if (scheduledTask.State == TaskState.Idle) { Execute(scheduledTask, new TaskOptions()); } } } } /// /// Queues the scheduled task. /// /// The task. /// The task options. public void QueueScheduledTask(IScheduledTask task, TaskOptions options) { var scheduledTask = ScheduledTasks.FirstOrDefault(t => t.ScheduledTask.GetType() == task.GetType()); if (scheduledTask == null) { _logger.LogError("Unable to find scheduled task of type {0} in QueueScheduledTask.", task.GetType().Name); } else { QueueScheduledTask(scheduledTask, options); } } /// /// Queues the scheduled task. /// /// The task. /// The task options. private void QueueScheduledTask(IScheduledTaskWorker task, TaskOptions options) { var type = task.ScheduledTask.GetType(); _logger.LogInformation("Queueing task {0}", type.Name); lock (_taskQueue) { if (task.State == TaskState.Idle) { Execute(task, options); return; } _taskQueue.Enqueue(new Tuple(type, options)); } } /// /// Adds the tasks. /// /// The tasks. public void AddTasks(IEnumerable tasks) { var list = tasks.Select(t => new ScheduledTaskWorker(t, _applicationPaths, this, _jsonSerializer, _logger)); ScheduledTasks = ScheduledTasks.Concat(list).ToArray(); } /// /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources. /// public void Dispose() { Dispose(true); } /// /// Releases unmanaged and - optionally - managed resources. /// /// true to release both managed and unmanaged resources; false to release only unmanaged resources. protected virtual void Dispose(bool dispose) { foreach (var task in ScheduledTasks) { task.Dispose(); } } public void Cancel(IScheduledTaskWorker task) { ((ScheduledTaskWorker)task).Cancel(); } public Task Execute(IScheduledTaskWorker task, TaskOptions options) { return ((ScheduledTaskWorker)task).Execute(options); } /// /// Called when [task executing]. /// /// The task. internal void OnTaskExecuting(IScheduledTaskWorker task) { TaskExecuting?.Invoke(this, new GenericEventArgs(task)); } /// /// Called when [task completed]. /// /// The task. /// The result. internal void OnTaskCompleted(IScheduledTaskWorker task, TaskResult result) { TaskCompleted?.Invoke(task, new TaskCompletionEventArgs(task, result)); ExecuteQueuedTasks(); } /// /// Executes the queued tasks. /// private void ExecuteQueuedTasks() { _logger.LogInformation("ExecuteQueuedTasks"); // Execute queued tasks lock (_taskQueue) { var list = new List>(); while (_taskQueue.TryDequeue(out var item)) { if (list.All(i => i.Item1 != item.Item1)) { list.Add(item); } } foreach (var enqueuedType in list) { var scheduledTask = ScheduledTasks.First(t => t.ScheduledTask.GetType() == enqueuedType.Item1); if (scheduledTask.State == TaskState.Idle) { Execute(scheduledTask, enqueuedType.Item2); } } } } } }