|
|
@ -10,7 +10,7 @@ except ImportError: # pragma: nocover
|
|
|
|
import pickle
|
|
|
|
import pickle
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
import rethinkdb as r
|
|
|
|
from rethinkdb import RethinkDB
|
|
|
|
except ImportError: # pragma: nocover
|
|
|
|
except ImportError: # pragma: nocover
|
|
|
|
raise ImportError('RethinkDBJobStore requires rethinkdb installed')
|
|
|
|
raise ImportError('RethinkDBJobStore requires rethinkdb installed')
|
|
|
|
|
|
|
|
|
|
|
@ -40,10 +40,12 @@ class RethinkDBJobStore(BaseJobStore):
|
|
|
|
raise ValueError('The "table" parameter must not be empty')
|
|
|
|
raise ValueError('The "table" parameter must not be empty')
|
|
|
|
|
|
|
|
|
|
|
|
self.database = database
|
|
|
|
self.database = database
|
|
|
|
self.table = table
|
|
|
|
self.table_name = table
|
|
|
|
|
|
|
|
self.table = None
|
|
|
|
self.client = client
|
|
|
|
self.client = client
|
|
|
|
self.pickle_protocol = pickle_protocol
|
|
|
|
self.pickle_protocol = pickle_protocol
|
|
|
|
self.connect_args = connect_args
|
|
|
|
self.connect_args = connect_args
|
|
|
|
|
|
|
|
self.r = RethinkDB()
|
|
|
|
self.conn = None
|
|
|
|
self.conn = None
|
|
|
|
|
|
|
|
|
|
|
|
def start(self, scheduler, alias):
|
|
|
|
def start(self, scheduler, alias):
|
|
|
@ -52,31 +54,31 @@ class RethinkDBJobStore(BaseJobStore):
|
|
|
|
if self.client:
|
|
|
|
if self.client:
|
|
|
|
self.conn = maybe_ref(self.client)
|
|
|
|
self.conn = maybe_ref(self.client)
|
|
|
|
else:
|
|
|
|
else:
|
|
|
|
self.conn = r.connect(db=self.database, **self.connect_args)
|
|
|
|
self.conn = self.r.connect(db=self.database, **self.connect_args)
|
|
|
|
|
|
|
|
|
|
|
|
if self.database not in r.db_list().run(self.conn):
|
|
|
|
if self.database not in self.r.db_list().run(self.conn):
|
|
|
|
r.db_create(self.database).run(self.conn)
|
|
|
|
self.r.db_create(self.database).run(self.conn)
|
|
|
|
|
|
|
|
|
|
|
|
if self.table not in r.table_list().run(self.conn):
|
|
|
|
if self.table_name not in self.r.table_list().run(self.conn):
|
|
|
|
r.table_create(self.table).run(self.conn)
|
|
|
|
self.r.table_create(self.table_name).run(self.conn)
|
|
|
|
|
|
|
|
|
|
|
|
if 'next_run_time' not in r.table(self.table).index_list().run(self.conn):
|
|
|
|
if 'next_run_time' not in self.r.table(self.table_name).index_list().run(self.conn):
|
|
|
|
r.table(self.table).index_create('next_run_time').run(self.conn)
|
|
|
|
self.r.table(self.table_name).index_create('next_run_time').run(self.conn)
|
|
|
|
|
|
|
|
|
|
|
|
self.table = r.db(self.database).table(self.table)
|
|
|
|
self.table = self.r.db(self.database).table(self.table_name)
|
|
|
|
|
|
|
|
|
|
|
|
def lookup_job(self, job_id):
|
|
|
|
def lookup_job(self, job_id):
|
|
|
|
results = list(self.table.get_all(job_id).pluck('job_state').run(self.conn))
|
|
|
|
results = list(self.table.get_all(job_id).pluck('job_state').run(self.conn))
|
|
|
|
return self._reconstitute_job(results[0]['job_state']) if results else None
|
|
|
|
return self._reconstitute_job(results[0]['job_state']) if results else None
|
|
|
|
|
|
|
|
|
|
|
|
def get_due_jobs(self, now):
|
|
|
|
def get_due_jobs(self, now):
|
|
|
|
return self._get_jobs(r.row['next_run_time'] <= datetime_to_utc_timestamp(now))
|
|
|
|
return self._get_jobs(self.r.row['next_run_time'] <= datetime_to_utc_timestamp(now))
|
|
|
|
|
|
|
|
|
|
|
|
def get_next_run_time(self):
|
|
|
|
def get_next_run_time(self):
|
|
|
|
results = list(
|
|
|
|
results = list(
|
|
|
|
self.table
|
|
|
|
self.table
|
|
|
|
.filter(r.row['next_run_time'] != None) # flake8: noqa
|
|
|
|
.filter(self.r.row['next_run_time'] != None) # noqa
|
|
|
|
.order_by(r.asc('next_run_time'))
|
|
|
|
.order_by(self.r.asc('next_run_time'))
|
|
|
|
.map(lambda x: x['next_run_time'])
|
|
|
|
.map(lambda x: x['next_run_time'])
|
|
|
|
.limit(1)
|
|
|
|
.limit(1)
|
|
|
|
.run(self.conn)
|
|
|
|
.run(self.conn)
|
|
|
@ -92,7 +94,7 @@ class RethinkDBJobStore(BaseJobStore):
|
|
|
|
job_dict = {
|
|
|
|
job_dict = {
|
|
|
|
'id': job.id,
|
|
|
|
'id': job.id,
|
|
|
|
'next_run_time': datetime_to_utc_timestamp(job.next_run_time),
|
|
|
|
'next_run_time': datetime_to_utc_timestamp(job.next_run_time),
|
|
|
|
'job_state': r.binary(pickle.dumps(job.__getstate__(), self.pickle_protocol))
|
|
|
|
'job_state': self.r.binary(pickle.dumps(job.__getstate__(), self.pickle_protocol))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
results = self.table.insert(job_dict).run(self.conn)
|
|
|
|
results = self.table.insert(job_dict).run(self.conn)
|
|
|
|
if results['errors'] > 0:
|
|
|
|
if results['errors'] > 0:
|
|
|
@ -101,7 +103,7 @@ class RethinkDBJobStore(BaseJobStore):
|
|
|
|
def update_job(self, job):
|
|
|
|
def update_job(self, job):
|
|
|
|
changes = {
|
|
|
|
changes = {
|
|
|
|
'next_run_time': datetime_to_utc_timestamp(job.next_run_time),
|
|
|
|
'next_run_time': datetime_to_utc_timestamp(job.next_run_time),
|
|
|
|
'job_state': r.binary(pickle.dumps(job.__getstate__(), self.pickle_protocol))
|
|
|
|
'job_state': self.r.binary(pickle.dumps(job.__getstate__(), self.pickle_protocol))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
results = self.table.get_all(job.id).update(changes).run(self.conn)
|
|
|
|
results = self.table.get_all(job.id).update(changes).run(self.conn)
|
|
|
|
skipped = False in map(lambda x: results[x] == 0, results.keys())
|
|
|
|
skipped = False in map(lambda x: results[x] == 0, results.keys())
|
|
|
@ -130,20 +132,20 @@ class RethinkDBJobStore(BaseJobStore):
|
|
|
|
def _get_jobs(self, predicate=None):
|
|
|
|
def _get_jobs(self, predicate=None):
|
|
|
|
jobs = []
|
|
|
|
jobs = []
|
|
|
|
failed_job_ids = []
|
|
|
|
failed_job_ids = []
|
|
|
|
query = (self.table.filter(r.row['next_run_time'] != None).filter(predicate) if
|
|
|
|
query = (self.table.filter(self.r.row['next_run_time'] != None).filter(predicate) # noqa
|
|
|
|
predicate else self.table)
|
|
|
|
if predicate else self.table)
|
|
|
|
query = query.order_by('next_run_time', 'id').pluck('id', 'job_state')
|
|
|
|
query = query.order_by('next_run_time', 'id').pluck('id', 'job_state')
|
|
|
|
|
|
|
|
|
|
|
|
for document in query.run(self.conn):
|
|
|
|
for document in query.run(self.conn):
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
jobs.append(self._reconstitute_job(document['job_state']))
|
|
|
|
jobs.append(self._reconstitute_job(document['job_state']))
|
|
|
|
except:
|
|
|
|
except Exception:
|
|
|
|
self._logger.exception('Unable to restore job "%s" -- removing it', document['id'])
|
|
|
|
self._logger.exception('Unable to restore job "%s" -- removing it', document['id'])
|
|
|
|
failed_job_ids.append(document['id'])
|
|
|
|
failed_job_ids.append(document['id'])
|
|
|
|
|
|
|
|
|
|
|
|
# Remove all the jobs we failed to restore
|
|
|
|
# Remove all the jobs we failed to restore
|
|
|
|
if failed_job_ids:
|
|
|
|
if failed_job_ids:
|
|
|
|
r.expr(failed_job_ids).for_each(
|
|
|
|
self.r.expr(failed_job_ids).for_each(
|
|
|
|
lambda job_id: self.table.get_all(job_id).delete()).run(self.conn)
|
|
|
|
lambda job_id: self.table.get_all(job_id).delete()).run(self.conn)
|
|
|
|
|
|
|
|
|
|
|
|
return jobs
|
|
|
|
return jobs
|
|
|
|