@ -4,6 +4,7 @@ using System.Diagnostics;
using System.Linq ;
using System.Threading ;
using NLog ;
using NzbDrone.Core.Lifecycle ;
using NzbDrone.Core.Model ;
using NzbDrone.Core.Model.Notification ;
using NzbDrone.Core.Providers ;
@ -19,12 +20,13 @@ namespace NzbDrone.Core.Jobs
bool QueueJob ( string jobTypeString ) ;
}
public class JobController : IJobController
public class JobController : IJobController , IInitializable
{
private readonly NotificationProvider _notificationProvider ;
private readonly IEnumerable < IJob > _jobs ;
private readonly IJobRepository _jobRepository ;
private readonly Logger logger ;
private readonly Logger _logger ;
private Timer _timer ;
private Thread _jobThread ;
public Stopwatch StopWatch { get ; private set ; }
@ -41,10 +43,16 @@ namespace NzbDrone.Core.Jobs
_notificationProvider = notificationProvider ;
_jobs = jobs ;
_jobRepository = jobRepository ;
this . logger = logger ;
_ logger = logger ;
ResetThread ( ) ;
}
public void Init ( )
{
_timer = new Timer ( c = > QueueScheduled ( ) ) ;
_timer . Change ( 0 , 60 * 1000 ) ;
}
public List < JobQueueItem > Queue
{
get
@ -62,7 +70,7 @@ namespace NzbDrone.Core.Jobs
if ( _jobThread . IsAlive )
{
logger. Trace ( "Queue is already running. Ignoring scheduler's request." ) ;
_ logger. Trace ( "Queue is already running. Ignoring scheduler's request." ) ;
return ;
}
}
@ -73,7 +81,7 @@ namespace NzbDrone.Core.Jobs
pendingJobs . ForEach ( jobType = > QueueJob ( jobType , source : JobQueueItem . JobSourceType . Scheduler ) ) ;
logger. Trace ( "{0} Scheduled tasks have been added to the queue" , pendingJobs . Count ) ;
_ logger. Trace ( "{0} Scheduled tasks have been added to the queue" , pendingJobs . Count ) ;
}
public virtual void QueueJob ( Type jobType , dynamic options = null , JobQueueItem . JobSourceType source = JobQueueItem . JobSourceType . User )
@ -85,7 +93,7 @@ namespace NzbDrone.Core.Jobs
Source = source
} ;
logger. Debug ( "Attempting to queue {0}" , queueItem ) ;
_ logger. Debug ( "Attempting to queue {0}" , queueItem ) ;
lock ( _executionLock )
{
@ -96,17 +104,17 @@ namespace NzbDrone.Core.Jobs
if ( ! Queue . Contains ( queueItem ) )
{
Queue . Add ( queueItem ) ;
logger. Trace ( "Job {0} added to the queue. current items in queue: {1}" , queueItem , Queue . Count ) ;
_ logger. Trace ( "Job {0} added to the queue. current items in queue: {1}" , queueItem , Queue . Count ) ;
}
else
{
logger. Info ( "{0} already exists in the queue. Skipping. current items in queue: {1}" , queueItem , Queue . Count ) ;
_ logger. Info ( "{0} already exists in the queue. Skipping. current items in queue: {1}" , queueItem , Queue . Count ) ;
}
}
if ( _jobThread . IsAlive )
{
logger. Trace ( "Queue is already running. No need to start it up." ) ;
_ logger. Trace ( "Queue is already running. No need to start it up." ) ;
return ;
}
@ -145,7 +153,7 @@ namespace NzbDrone.Core.Jobs
if ( Queue . Count ! = 0 )
{
job = Queue . OrderBy ( c = > c . Source ) . First ( ) ;
logger. Trace ( "Popping {0} from the queue." , job ) ;
_ logger. Trace ( "Popping {0} from the queue." , job ) ;
Queue . Remove ( job ) ;
}
}
@ -161,7 +169,7 @@ namespace NzbDrone.Core.Jobs
}
catch ( Exception e )
{
logger. FatalException ( "An error has occurred while executing job." , e ) ;
_ logger. FatalException ( "An error has occurred while executing job." , e ) ;
}
}
@ -169,16 +177,16 @@ namespace NzbDrone.Core.Jobs
}
catch ( ThreadAbortException e )
{
logger. Warn ( e . Message ) ;
_ logger. Warn ( e . Message ) ;
}
catch ( Exception e )
{
logger. ErrorException ( "Error has occurred in queue processor thread" , e ) ;
_ logger. ErrorException ( "Error has occurred in queue processor thread" , e ) ;
}
finally
{
StopWatch . Stop ( ) ;
logger. Trace ( "Finished processing jobs in the queue." ) ;
_ logger. Trace ( "Finished processing jobs in the queue." ) ;
}
}
@ -187,7 +195,7 @@ namespace NzbDrone.Core.Jobs
var jobImplementation = _jobs . SingleOrDefault ( t = > t . GetType ( ) = = queueItem . JobType ) ;
if ( jobImplementation = = null )
{
logger. Error ( "Unable to locate implementation for '{0}'. Make sure it is properly registered." , queueItem . JobType ) ;
_ logger. Error ( "Unable to locate implementation for '{0}'. Make sure it is properly registered." , queueItem . JobType ) ;
return ;
}
@ -196,7 +204,7 @@ namespace NzbDrone.Core.Jobs
{
try
{
logger. Debug ( "Starting {0}. Last execution {1}" , queueItem , jobDefinition . LastExecution ) ;
_ logger. Debug ( "Starting {0}. Last execution {1}" , queueItem , jobDefinition . LastExecution ) ;
var sw = Stopwatch . StartNew ( ) ;
@ -208,7 +216,7 @@ namespace NzbDrone.Core.Jobs
jobDefinition . Success = true ;
sw . Stop ( ) ;
logger. Debug ( "Job {0} successfully completed in {1:0}.{2} seconds." , queueItem , sw . Elapsed . TotalSeconds , sw . Elapsed . Milliseconds / 100 ,
_ logger. Debug ( "Job {0} successfully completed in {1:0}.{2} seconds." , queueItem , sw . Elapsed . TotalSeconds , sw . Elapsed . Milliseconds / 100 ,
sw . Elapsed . Seconds ) ;
}
catch ( ThreadAbortException )
@ -217,7 +225,7 @@ namespace NzbDrone.Core.Jobs
}
catch ( Exception e )
{
logger. ErrorException ( "An error has occurred while executing job [" + jobImplementation . Name + "]." , e ) ;
_ logger. ErrorException ( "An error has occurred while executing job [" + jobImplementation . Name + "]." , e ) ;
_notification . Status = ProgressNotificationStatus . Failed ;
_notification . CurrentMessage = jobImplementation . Name + " Failed." ;
@ -237,7 +245,7 @@ namespace NzbDrone.Core.Jobs
{
if ( StopWatch . Elapsed . TotalHours > 1 )
{
logger. Warn ( "Thread job has been running for more than an hour. fuck it!" ) ;
_ logger. Warn ( "Thread job has been running for more than an hour. fuck it!" ) ;
ResetThread ( ) ;
}
}
@ -249,8 +257,10 @@ namespace NzbDrone.Core.Jobs
_jobThread . Abort ( ) ;
}
logger. Trace ( "resetting queue processor thread" ) ;
_ logger. Trace ( "resetting queue processor thread" ) ;
_jobThread = new Thread ( ProcessQueue ) { Name = "JobQueueThread" } ;
}
}
}