from __future__ import absolute_import
from apscheduler . jobstores . base import BaseJobStore , JobLookupError , ConflictingIdError
from apscheduler . util import maybe_ref , datetime_to_utc_timestamp , utc_timestamp_to_datetime
from apscheduler . job import Job
try :
import cPickle as pickle
except ImportError : # pragma: nocover
import pickle
try :
from rethinkdb import RethinkDB
except ImportError : # pragma: nocover
raise ImportError ( ' RethinkDBJobStore requires rethinkdb installed ' )
class RethinkDBJobStore ( BaseJobStore ) :
"""
Stores jobs in a RethinkDB database . Any leftover keyword arguments are directly passed to
rethinkdb ' s `RethinkdbClient <http://www.rethinkdb.com/api/#connect>`_.
Plugin alias : ` ` rethinkdb ` `
: param str database : database to store jobs in
: param str collection : collection to store jobs in
: param client : a : class : ` rethinkdb . net . Connection ` instance to use instead of providing
connection arguments
: param int pickle_protocol : pickle protocol level to use ( for serialization ) , defaults to the
highest available
"""
def __init__ ( self , database = ' apscheduler ' , table = ' jobs ' , client = None ,
pickle_protocol = pickle . HIGHEST_PROTOCOL , * * connect_args ) :
super ( RethinkDBJobStore , self ) . __init__ ( )
if not database :
raise ValueError ( ' The " database " parameter must not be empty ' )
if not table :
raise ValueError ( ' The " table " parameter must not be empty ' )
self . database = database
self . table_name = table
self . table = None
self . client = client
self . pickle_protocol = pickle_protocol
self . connect_args = connect_args
self . r = RethinkDB ( )
self . conn = None
def start ( self , scheduler , alias ) :
super ( RethinkDBJobStore , self ) . start ( scheduler , alias )
if self . client :
self . conn = maybe_ref ( self . client )
else :
self . conn = self . r . connect ( db = self . database , * * self . connect_args )
if self . database not in self . r . db_list ( ) . run ( self . conn ) :
self . r . db_create ( self . database ) . run ( self . conn )
if self . table_name not in self . r . table_list ( ) . run ( self . conn ) :
self . r . table_create ( self . table_name ) . run ( self . conn )
if ' next_run_time ' not in self . r . table ( self . table_name ) . index_list ( ) . run ( self . conn ) :
self . r . table ( self . table_name ) . index_create ( ' next_run_time ' ) . run ( self . conn )
self . table = self . r . db ( self . database ) . table ( self . table_name )
def lookup_job ( self , job_id ) :
results = list ( self . table . get_all ( job_id ) . pluck ( ' job_state ' ) . run ( self . conn ) )
return self . _reconstitute_job ( results [ 0 ] [ ' job_state ' ] ) if results else None
def get_due_jobs ( self , now ) :
return self . _get_jobs ( self . r . row [ ' next_run_time ' ] < = datetime_to_utc_timestamp ( now ) )
def get_next_run_time ( self ) :
results = list (
self . table
. filter ( self . r . row [ ' next_run_time ' ] != None ) # noqa
. order_by ( self . r . asc ( ' next_run_time ' ) )
. map ( lambda x : x [ ' next_run_time ' ] )
. limit ( 1 )
. run ( self . conn )
)
return utc_timestamp_to_datetime ( results [ 0 ] ) if results else None
def get_all_jobs ( self ) :
jobs = self . _get_jobs ( )
self . _fix_paused_jobs_sorting ( jobs )
return jobs
def add_job ( self , job ) :
job_dict = {
' id ' : job . id ,
' next_run_time ' : datetime_to_utc_timestamp ( job . next_run_time ) ,
' job_state ' : self . r . binary ( pickle . dumps ( job . __getstate__ ( ) , self . pickle_protocol ) )
}
results = self . table . insert ( job_dict ) . run ( self . conn )
if results [ ' errors ' ] > 0 :
raise ConflictingIdError ( job . id )
def update_job ( self , job ) :
changes = {
' next_run_time ' : datetime_to_utc_timestamp ( job . next_run_time ) ,
' job_state ' : self . r . binary ( pickle . dumps ( job . __getstate__ ( ) , self . pickle_protocol ) )
}
results = self . table . get_all ( job . id ) . update ( changes ) . run ( self . conn )
skipped = False in map ( lambda x : results [ x ] == 0 , results . keys ( ) )
if results [ ' skipped ' ] > 0 or results [ ' errors ' ] > 0 or not skipped :
raise JobLookupError ( job . id )
def remove_job ( self , job_id ) :
results = self . table . get_all ( job_id ) . delete ( ) . run ( self . conn )
if results [ ' deleted ' ] + results [ ' skipped ' ] != 1 :
raise JobLookupError ( job_id )
def remove_all_jobs ( self ) :
self . table . delete ( ) . run ( self . conn )
def shutdown ( self ) :
self . conn . close ( )
def _reconstitute_job ( self , job_state ) :
job_state = pickle . loads ( job_state )
job = Job . __new__ ( Job )
job . __setstate__ ( job_state )
job . _scheduler = self . _scheduler
job . _jobstore_alias = self . _alias
return job
def _get_jobs ( self , predicate = None ) :
jobs = [ ]
failed_job_ids = [ ]
query = ( self . table . filter ( self . r . row [ ' next_run_time ' ] != None ) . filter ( predicate ) # noqa
if predicate else self . table )
query = query . order_by ( ' next_run_time ' , ' id ' ) . pluck ( ' id ' , ' job_state ' )
for document in query . run ( self . conn ) :
try :
jobs . append ( self . _reconstitute_job ( document [ ' job_state ' ] ) )
except Exception :
self . _logger . exception ( ' Unable to restore job " %s " -- removing it ' , document [ ' id ' ] )
failed_job_ids . append ( document [ ' id ' ] )
# Remove all the jobs we failed to restore
if failed_job_ids :
self . r . expr ( failed_job_ids ) . for_each (
lambda job_id : self . table . get_all ( job_id ) . delete ( ) ) . run ( self . conn )
return jobs
def __repr__ ( self ) :
connection = self . conn
return ' < %s (connection= %s )> ' % ( self . __class__ . __name__ , connection )