You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
393 lines
11 KiB
393 lines
11 KiB
"""Synchronization functions.
|
|
|
|
File- and mutex-based mutual exclusion synchronizers are provided,
|
|
as well as a name-based mutex which locks within an application
|
|
based on a string name.
|
|
|
|
"""
|
|
import errno
|
|
import os
|
|
import sys
|
|
import tempfile
|
|
|
|
try:
|
|
import threading as _threading
|
|
except ImportError:
|
|
import dummy_threading as _threading
|
|
|
|
# check for fcntl module
|
|
try:
|
|
sys.getwindowsversion()
|
|
has_flock = False
|
|
except:
|
|
try:
|
|
import fcntl
|
|
has_flock = True
|
|
except ImportError:
|
|
has_flock = False
|
|
|
|
from beaker import util
|
|
from beaker.exceptions import LockError
|
|
|
|
__all__ = ["file_synchronizer", "mutex_synchronizer", "null_synchronizer",
|
|
"NameLock", "_threading"]
|
|
|
|
|
|
class NameLock(object):
|
|
"""a proxy for an RLock object that is stored in a name based
|
|
registry.
|
|
|
|
Multiple threads can get a reference to the same RLock based on the
|
|
name alone, and synchronize operations related to that name.
|
|
|
|
"""
|
|
locks = util.WeakValuedRegistry()
|
|
|
|
class NLContainer(object):
|
|
def __init__(self, reentrant):
|
|
if reentrant:
|
|
self.lock = _threading.RLock()
|
|
else:
|
|
self.lock = _threading.Lock()
|
|
|
|
def __call__(self):
|
|
return self.lock
|
|
|
|
def __init__(self, identifier=None, reentrant=False):
|
|
if identifier is None:
|
|
self._lock = NameLock.NLContainer(reentrant)
|
|
else:
|
|
self._lock = NameLock.locks.get(identifier, NameLock.NLContainer,
|
|
reentrant)
|
|
|
|
def acquire(self, wait=True):
|
|
return self._lock().acquire(wait)
|
|
|
|
def release(self):
|
|
self._lock().release()
|
|
|
|
|
|
_synchronizers = util.WeakValuedRegistry()
|
|
|
|
|
|
def _synchronizer(identifier, cls, **kwargs):
|
|
return _synchronizers.sync_get((identifier, cls), cls, identifier, **kwargs)
|
|
|
|
|
|
def file_synchronizer(identifier, **kwargs):
|
|
if not has_flock or 'lock_dir' not in kwargs:
|
|
return mutex_synchronizer(identifier)
|
|
else:
|
|
return _synchronizer(identifier, FileSynchronizer, **kwargs)
|
|
|
|
|
|
def mutex_synchronizer(identifier, **kwargs):
|
|
return _synchronizer(identifier, ConditionSynchronizer, **kwargs)
|
|
|
|
|
|
class null_synchronizer(object):
|
|
"""A 'null' synchronizer, which provides the :class:`.SynchronizerImpl` interface
|
|
without any locking.
|
|
|
|
"""
|
|
def acquire_write_lock(self, wait=True):
|
|
return True
|
|
|
|
def acquire_read_lock(self):
|
|
pass
|
|
|
|
def release_write_lock(self):
|
|
pass
|
|
|
|
def release_read_lock(self):
|
|
pass
|
|
acquire = acquire_write_lock
|
|
release = release_write_lock
|
|
|
|
|
|
class SynchronizerImpl(object):
|
|
"""Base class for a synchronization object that allows
|
|
multiple readers, single writers.
|
|
|
|
"""
|
|
def __init__(self):
|
|
self._state = util.ThreadLocal()
|
|
|
|
class SyncState(object):
|
|
__slots__ = 'reentrantcount', 'writing', 'reading'
|
|
|
|
def __init__(self):
|
|
self.reentrantcount = 0
|
|
self.writing = False
|
|
self.reading = False
|
|
|
|
def state(self):
|
|
if not self._state.has():
|
|
state = SynchronizerImpl.SyncState()
|
|
self._state.put(state)
|
|
return state
|
|
else:
|
|
return self._state.get()
|
|
state = property(state)
|
|
|
|
def release_read_lock(self):
|
|
state = self.state
|
|
|
|
if state.writing:
|
|
raise LockError("lock is in writing state")
|
|
if not state.reading:
|
|
raise LockError("lock is not in reading state")
|
|
|
|
if state.reentrantcount == 1:
|
|
self.do_release_read_lock()
|
|
state.reading = False
|
|
|
|
state.reentrantcount -= 1
|
|
|
|
def acquire_read_lock(self, wait=True):
|
|
state = self.state
|
|
|
|
if state.writing:
|
|
raise LockError("lock is in writing state")
|
|
|
|
if state.reentrantcount == 0:
|
|
x = self.do_acquire_read_lock(wait)
|
|
if (wait or x):
|
|
state.reentrantcount += 1
|
|
state.reading = True
|
|
return x
|
|
elif state.reading:
|
|
state.reentrantcount += 1
|
|
return True
|
|
|
|
def release_write_lock(self):
|
|
state = self.state
|
|
|
|
if state.reading:
|
|
raise LockError("lock is in reading state")
|
|
if not state.writing:
|
|
raise LockError("lock is not in writing state")
|
|
|
|
if state.reentrantcount == 1:
|
|
self.do_release_write_lock()
|
|
state.writing = False
|
|
|
|
state.reentrantcount -= 1
|
|
|
|
release = release_write_lock
|
|
|
|
def acquire_write_lock(self, wait=True):
|
|
state = self.state
|
|
|
|
if state.reading:
|
|
raise LockError("lock is in reading state")
|
|
|
|
if state.reentrantcount == 0:
|
|
x = self.do_acquire_write_lock(wait)
|
|
if (wait or x):
|
|
state.reentrantcount += 1
|
|
state.writing = True
|
|
return x
|
|
elif state.writing:
|
|
state.reentrantcount += 1
|
|
return True
|
|
|
|
acquire = acquire_write_lock
|
|
|
|
def do_release_read_lock(self):
|
|
raise NotImplementedError()
|
|
|
|
def do_acquire_read_lock(self, wait):
|
|
raise NotImplementedError()
|
|
|
|
def do_release_write_lock(self):
|
|
raise NotImplementedError()
|
|
|
|
def do_acquire_write_lock(self, wait):
|
|
raise NotImplementedError()
|
|
|
|
|
|
class FileSynchronizer(SynchronizerImpl):
|
|
"""A synchronizer which locks using flock().
|
|
|
|
"""
|
|
def __init__(self, identifier, lock_dir):
|
|
super(FileSynchronizer, self).__init__()
|
|
self._filedescriptor = util.ThreadLocal()
|
|
|
|
if lock_dir is None:
|
|
lock_dir = tempfile.gettempdir()
|
|
else:
|
|
lock_dir = lock_dir
|
|
|
|
self.filename = util.encoded_path(
|
|
lock_dir,
|
|
[identifier],
|
|
extension='.lock'
|
|
)
|
|
self.lock_dir = os.path.dirname(self.filename)
|
|
|
|
def _filedesc(self):
|
|
return self._filedescriptor.get()
|
|
_filedesc = property(_filedesc)
|
|
|
|
def _ensuredir(self):
|
|
if not os.path.exists(self.lock_dir):
|
|
util.verify_directory(self.lock_dir)
|
|
|
|
def _open(self, mode):
|
|
filedescriptor = self._filedesc
|
|
if filedescriptor is None:
|
|
self._ensuredir()
|
|
filedescriptor = os.open(self.filename, mode)
|
|
self._filedescriptor.put(filedescriptor)
|
|
return filedescriptor
|
|
|
|
def do_acquire_read_lock(self, wait):
|
|
filedescriptor = self._open(os.O_CREAT | os.O_RDONLY)
|
|
if not wait:
|
|
try:
|
|
fcntl.flock(filedescriptor, fcntl.LOCK_SH | fcntl.LOCK_NB)
|
|
return True
|
|
except IOError:
|
|
os.close(filedescriptor)
|
|
self._filedescriptor.remove()
|
|
return False
|
|
else:
|
|
fcntl.flock(filedescriptor, fcntl.LOCK_SH)
|
|
return True
|
|
|
|
def do_acquire_write_lock(self, wait):
|
|
filedescriptor = self._open(os.O_CREAT | os.O_WRONLY)
|
|
if not wait:
|
|
try:
|
|
fcntl.flock(filedescriptor, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
|
return True
|
|
except IOError:
|
|
os.close(filedescriptor)
|
|
self._filedescriptor.remove()
|
|
return False
|
|
else:
|
|
fcntl.flock(filedescriptor, fcntl.LOCK_EX)
|
|
return True
|
|
|
|
def do_release_read_lock(self):
|
|
self._release_all_locks()
|
|
|
|
def do_release_write_lock(self):
|
|
self._release_all_locks()
|
|
|
|
def _release_all_locks(self):
|
|
filedescriptor = self._filedesc
|
|
if filedescriptor is not None:
|
|
fcntl.flock(filedescriptor, fcntl.LOCK_UN)
|
|
os.close(filedescriptor)
|
|
self._filedescriptor.remove()
|
|
|
|
|
|
class ConditionSynchronizer(SynchronizerImpl):
|
|
"""a synchronizer using a Condition."""
|
|
|
|
def __init__(self, identifier):
|
|
super(ConditionSynchronizer, self).__init__()
|
|
|
|
# counts how many asynchronous methods are executing
|
|
self.asynch = 0
|
|
|
|
# pointer to thread that is the current sync operation
|
|
self.current_sync_operation = None
|
|
|
|
# condition object to lock on
|
|
self.condition = _threading.Condition(_threading.Lock())
|
|
|
|
def do_acquire_read_lock(self, wait=True):
|
|
self.condition.acquire()
|
|
try:
|
|
# see if a synchronous operation is waiting to start
|
|
# or is already running, in which case we wait (or just
|
|
# give up and return)
|
|
if wait:
|
|
while self.current_sync_operation is not None:
|
|
self.condition.wait()
|
|
else:
|
|
if self.current_sync_operation is not None:
|
|
return False
|
|
|
|
self.asynch += 1
|
|
finally:
|
|
self.condition.release()
|
|
|
|
if not wait:
|
|
return True
|
|
|
|
def do_release_read_lock(self):
|
|
self.condition.acquire()
|
|
try:
|
|
self.asynch -= 1
|
|
|
|
# check if we are the last asynchronous reader thread
|
|
# out the door.
|
|
if self.asynch == 0:
|
|
# yes. so if a sync operation is waiting, notifyAll to wake
|
|
# it up
|
|
if self.current_sync_operation is not None:
|
|
self.condition.notifyAll()
|
|
elif self.asynch < 0:
|
|
raise LockError("Synchronizer error - too many "
|
|
"release_read_locks called")
|
|
finally:
|
|
self.condition.release()
|
|
|
|
def do_acquire_write_lock(self, wait=True):
|
|
self.condition.acquire()
|
|
try:
|
|
# here, we are not a synchronous reader, and after returning,
|
|
# assuming waiting or immediate availability, we will be.
|
|
|
|
if wait:
|
|
# if another sync is working, wait
|
|
while self.current_sync_operation is not None:
|
|
self.condition.wait()
|
|
else:
|
|
# if another sync is working,
|
|
# we dont want to wait, so forget it
|
|
if self.current_sync_operation is not None:
|
|
return False
|
|
|
|
# establish ourselves as the current sync
|
|
# this indicates to other read/write operations
|
|
# that they should wait until this is None again
|
|
self.current_sync_operation = _threading.currentThread()
|
|
|
|
# now wait again for asyncs to finish
|
|
if self.asynch > 0:
|
|
if wait:
|
|
# wait
|
|
self.condition.wait()
|
|
else:
|
|
# we dont want to wait, so forget it
|
|
self.current_sync_operation = None
|
|
return False
|
|
finally:
|
|
self.condition.release()
|
|
|
|
if not wait:
|
|
return True
|
|
|
|
def do_release_write_lock(self):
|
|
self.condition.acquire()
|
|
try:
|
|
if self.current_sync_operation is not _threading.currentThread():
|
|
raise LockError("Synchronizer error - current thread doesnt "
|
|
"have the write lock")
|
|
|
|
# reset the current sync operation so
|
|
# another can get it
|
|
self.current_sync_operation = None
|
|
|
|
# tell everyone to get ready
|
|
self.condition.notifyAll()
|
|
finally:
|
|
# everyone go !!
|
|
self.condition.release()
|