using System; using System.Collections; using System.Collections.Generic; using System.Linq; using System.Threading; namespace NzbDrone.Core.Messaging.Commands { public class CommandQueue : IEnumerable { private readonly object _mutex = new object(); private readonly List _items; public CommandQueue() { _items = new List(); } public int Count => _items.Count; public void Add(CommandModel item) { lock (_mutex) { _items.Add(item); Monitor.PulseAll(_mutex); } } public IEnumerator GetEnumerator() { List copy = null; lock (_mutex) { copy = new List(_items); } return copy.GetEnumerator(); } IEnumerator IEnumerable.GetEnumerator() { return GetEnumerator(); } public List All() { List rval = null; lock (_mutex) { rval = _items; } return rval; } public CommandModel Find(int id) { return All().FirstOrDefault(q => q.Id == id); } public void RemoveMany(IEnumerable commands) { lock (_mutex) { foreach (var command in commands) { _items.Remove(command); } Monitor.PulseAll(_mutex); } } public bool RemoveIfQueued(int id) { var rval = false; lock (_mutex) { var command = _items.FirstOrDefault(q => q.Id == id); if (command?.Status == CommandStatus.Queued) { _items.Remove(command); rval = true; Monitor.PulseAll(_mutex); } } return rval; } public List QueuedOrStarted() { return All().Where(q => q.Status == CommandStatus.Queued || q.Status == CommandStatus.Started) .ToList(); } public IEnumerable GetConsumingEnumerable() { return GetConsumingEnumerable(CancellationToken.None); } public IEnumerable GetConsumingEnumerable(CancellationToken cancellationToken) { cancellationToken.Register(PulseAllConsumers); while (!cancellationToken.IsCancellationRequested) { CommandModel command = null; lock (_mutex) { if (cancellationToken.IsCancellationRequested) { break; } if (!TryGet(out command)) { Monitor.Wait(_mutex); continue; } } if (command != null) { yield return command; } } } public void PulseAllConsumers() { // Signal all consumers to reevaluate cancellation token lock (_mutex) { Monitor.PulseAll(_mutex); } } public bool TryGet(out CommandModel item) { var rval = true; item = default(CommandModel); lock (_mutex) { if (_items.Count == 0) { rval = false; } else { var startedCommands = _items.Where(c => c.Status == CommandStatus.Started) .ToList(); var exclusiveTypes = startedCommands.Where(x => x.Body.IsTypeExclusive) .Select(x => x.Body.Name) .ToList(); var queuedCommands = _items.Where(c => c.Status == CommandStatus.Queued); if (startedCommands.Any(x => x.Body.RequiresDiskAccess)) { queuedCommands = queuedCommands.Where(c => !c.Body.RequiresDiskAccess); } if (startedCommands.Any(x => x.Body.IsTypeExclusive)) { queuedCommands = queuedCommands.Where(c => !exclusiveTypes.Any(x => x == c.Body.Name)); } if (startedCommands.Any(x => x.Body.IsLongRunning)) { queuedCommands = queuedCommands.Where(c => c.Status == CommandStatus.Queued && !c.Body.IsExclusive); } var localItem = queuedCommands.OrderByDescending(c => c.Priority) .ThenBy(c => c.QueuedAt) .FirstOrDefault(); // Nothing queued that meets the requirements if (localItem == null) { rval = false; } // If any executing command is exclusive don't want return another command until it completes. else if (startedCommands.Any(c => c.Body.IsExclusive)) { rval = false; } // If the next command to execute is exclusive wait for executing commands to complete. // This will prevent other tasks from starting so the exclusive task executes in the order it should. else if (localItem.Body.IsExclusive && startedCommands.Any()) { rval = false; } // A command ready to execute else { localItem.StartedAt = DateTime.UtcNow; localItem.Status = CommandStatus.Started; item = localItem; } } } return rval; } } }