304 lines
12 KiB
Python
304 lines
12 KiB
Python
# cython: auto_pickle=False,embedsignature=True,always_allow_keywords=False
|
|
from __future__ import print_function, absolute_import, division
|
|
import sys
|
|
|
|
from gevent.timeout import Timeout
|
|
|
|
|
|
__all__ = [
|
|
'Semaphore',
|
|
'BoundedSemaphore',
|
|
]
|
|
|
|
# In Cython, we define these as 'cdef [inline]' functions. The
|
|
# compilation unit cannot have a direct assignment to them (import
|
|
# is assignment) without generating a 'lvalue is not valid target'
|
|
# error.
|
|
locals()['getcurrent'] = __import__('greenlet').getcurrent
|
|
locals()['greenlet_init'] = lambda: None
|
|
locals()['get_hub'] = __import__('gevent').get_hub
|
|
|
|
class Semaphore(object):
|
|
"""
|
|
Semaphore(value=1) -> Semaphore
|
|
|
|
A semaphore manages a counter representing the number of release()
|
|
calls minus the number of acquire() calls, plus an initial value.
|
|
The acquire() method blocks if necessary until it can return
|
|
without making the counter negative.
|
|
|
|
If not given, ``value`` defaults to 1.
|
|
|
|
The semaphore is a context manager and can be used in ``with`` statements.
|
|
|
|
This Semaphore's ``__exit__`` method does not call the trace function
|
|
on CPython, but does under PyPy.
|
|
|
|
.. seealso:: :class:`BoundedSemaphore` for a safer version that prevents
|
|
some classes of bugs.
|
|
"""
|
|
|
|
def __init__(self, value=1):
|
|
if value < 0:
|
|
raise ValueError("semaphore initial value must be >= 0")
|
|
self.counter = value
|
|
self._dirty = False
|
|
# In PyPy 2.6.1 with Cython 0.23, `cdef public` or `cdef
|
|
# readonly` or simply `cdef` attributes of type `object` can appear to leak if
|
|
# a Python subclass is used (this is visible simply
|
|
# instantiating this subclass if _links=[]). Our _links and
|
|
# _notifier are such attributes, and gevent.thread subclasses
|
|
# this class. Thus, we carefully manage the lifetime of the
|
|
# objects we put in these attributes so that, in the normal
|
|
# case of a semaphore used correctly (deallocated when it's not
|
|
# locked and no one is waiting), the leak goes away (because
|
|
# these objects are back to None). This can also be solved on PyPy
|
|
# by simply not declaring these objects in the pxd file, but that doesn't work for
|
|
# CPython ("No attribute...")
|
|
# See https://github.com/gevent/gevent/issues/660
|
|
self._links = None
|
|
self._notifier = None
|
|
# we don't want to do get_hub() here to allow defining module-level locks
|
|
# without initializing the hub
|
|
|
|
def __str__(self):
|
|
params = (self.__class__.__name__, self.counter, len(self._links) if self._links else 0)
|
|
return '<%s counter=%s _links[%s]>' % params
|
|
|
|
def locked(self):
|
|
"""Return a boolean indicating whether the semaphore can be acquired.
|
|
Most useful with binary semaphores."""
|
|
return self.counter <= 0
|
|
|
|
def release(self):
|
|
"""
|
|
Release the semaphore, notifying any waiters if needed.
|
|
"""
|
|
self.counter += 1
|
|
self._start_notify()
|
|
return self.counter
|
|
|
|
def _start_notify(self):
|
|
if self._links and self.counter > 0 and not self._notifier:
|
|
# We create a new self._notifier each time through the loop,
|
|
# if needed. (it has a __bool__ method that tells whether it has
|
|
# been run; once it's run once---at the end of the loop---it becomes
|
|
# false.)
|
|
# NOTE: Passing the bound method will cause a memory leak on PyPy
|
|
# with Cython <= 0.23.3. You must use >= 0.23.4.
|
|
# See https://bitbucket.org/pypy/pypy/issues/2149/memory-leak-for-python-subclass-of-cpyext#comment-22371546
|
|
hub = get_hub() # pylint:disable=undefined-variable
|
|
self._notifier = hub.loop.run_callback(self._notify_links)
|
|
|
|
def _notify_links(self):
|
|
# Subclasses CANNOT override. This is a cdef method.
|
|
|
|
# We release self._notifier here. We are called by it
|
|
# at the end of the loop, and it is now false in a boolean way (as soon
|
|
# as this method returns).
|
|
# If we get acquired/released again, we will create a new one, but there's
|
|
# no need to keep it around until that point (making it potentially climb
|
|
# into older GC generations, notably on PyPy)
|
|
notifier = self._notifier
|
|
try:
|
|
while True:
|
|
self._dirty = False
|
|
if not self._links:
|
|
# In case we were manually unlinked before
|
|
# the callback. Which shouldn't happen
|
|
return
|
|
for link in self._links:
|
|
if self.counter <= 0:
|
|
return
|
|
try:
|
|
link(self) # Must use Cython >= 0.23.4 on PyPy else this leaks memory
|
|
except: # pylint:disable=bare-except
|
|
getcurrent().handle_error((link, self), *sys.exc_info()) # pylint:disable=undefined-variable
|
|
if self._dirty:
|
|
# We mutated self._links so we need to start over
|
|
break
|
|
if not self._dirty:
|
|
return
|
|
finally:
|
|
# We should not have created a new notifier even if callbacks
|
|
# released us because we loop through *all* of our links on the
|
|
# same callback while self._notifier is still true.
|
|
assert self._notifier is notifier
|
|
self._notifier = None
|
|
|
|
def rawlink(self, callback):
|
|
"""
|
|
rawlink(callback) -> None
|
|
|
|
Register a callback to call when a counter is more than zero.
|
|
|
|
*callback* will be called in the :class:`Hub <gevent.hub.Hub>`, so it must not use blocking gevent API.
|
|
*callback* will be passed one argument: this instance.
|
|
|
|
This method is normally called automatically by :meth:`acquire` and :meth:`wait`; most code
|
|
will not need to use it.
|
|
"""
|
|
if not callable(callback):
|
|
raise TypeError('Expected callable:', callback)
|
|
if self._links is None:
|
|
self._links = [callback]
|
|
else:
|
|
self._links.append(callback)
|
|
self._dirty = True
|
|
|
|
def unlink(self, callback):
|
|
"""
|
|
unlink(callback) -> None
|
|
|
|
Remove the callback set by :meth:`rawlink`.
|
|
|
|
This method is normally called automatically by :meth:`acquire` and :meth:`wait`; most
|
|
code will not need to use it.
|
|
"""
|
|
try:
|
|
self._links.remove(callback)
|
|
self._dirty = True
|
|
except (ValueError, AttributeError):
|
|
pass
|
|
if not self._links:
|
|
self._links = None
|
|
# TODO: Cancel a notifier if there are no links?
|
|
|
|
def _do_wait(self, timeout):
|
|
"""
|
|
Wait for up to *timeout* seconds to expire. If timeout
|
|
elapses, return the exception. Otherwise, return None.
|
|
Raises timeout if a different timer expires.
|
|
"""
|
|
switch = getcurrent().switch # pylint:disable=undefined-variable
|
|
self.rawlink(switch)
|
|
try:
|
|
timer = Timeout._start_new_or_dummy(timeout)
|
|
try:
|
|
try:
|
|
result = get_hub().switch() # pylint:disable=undefined-variable
|
|
assert result is self, 'Invalid switch into Semaphore.wait/acquire(): %r' % (result, )
|
|
except Timeout as ex:
|
|
if ex is not timer:
|
|
raise
|
|
return ex
|
|
finally:
|
|
timer.cancel()
|
|
finally:
|
|
self.unlink(switch)
|
|
|
|
def wait(self, timeout=None):
|
|
"""
|
|
wait(timeout=None) -> int
|
|
|
|
Wait until it is possible to acquire this semaphore, or until the optional
|
|
*timeout* elapses.
|
|
|
|
.. caution:: If this semaphore was initialized with a size of 0,
|
|
this method will block forever if no timeout is given.
|
|
|
|
:keyword float timeout: If given, specifies the maximum amount of seconds
|
|
this method will block.
|
|
:return: A number indicating how many times the semaphore can be acquired
|
|
before blocking.
|
|
"""
|
|
if self.counter > 0:
|
|
return self.counter
|
|
|
|
self._do_wait(timeout) # return value irrelevant, whether we got it or got a timeout
|
|
return self.counter
|
|
|
|
def acquire(self, blocking=True, timeout=None):
|
|
"""
|
|
acquire(blocking=True, timeout=None) -> bool
|
|
|
|
Acquire the semaphore.
|
|
|
|
.. caution:: If this semaphore was initialized with a size of 0,
|
|
this method will block forever (unless a timeout is given or blocking is
|
|
set to false).
|
|
|
|
:keyword bool blocking: If True (the default), this function will block
|
|
until the semaphore is acquired.
|
|
:keyword float timeout: If given, specifies the maximum amount of seconds
|
|
this method will block.
|
|
:return: A boolean indicating whether the semaphore was acquired.
|
|
If ``blocking`` is True and ``timeout`` is None (the default), then
|
|
(so long as this semaphore was initialized with a size greater than 0)
|
|
this will always return True. If a timeout was given, and it expired before
|
|
the semaphore was acquired, False will be returned. (Note that this can still
|
|
raise a ``Timeout`` exception, if some other caller had already started a timer.)
|
|
"""
|
|
if self.counter > 0:
|
|
self.counter -= 1
|
|
return True
|
|
|
|
if not blocking:
|
|
return False
|
|
|
|
timeout = self._do_wait(timeout)
|
|
if timeout is not None:
|
|
# Our timer expired.
|
|
return False
|
|
|
|
# Neither our timer no another one expired, so we blocked until
|
|
# awoke. Therefore, the counter is ours
|
|
self.counter -= 1
|
|
assert self.counter >= 0
|
|
return True
|
|
|
|
_py3k_acquire = acquire # PyPy needs this; it must be static for Cython
|
|
|
|
def __enter__(self):
|
|
self.acquire()
|
|
|
|
def __exit__(self, t, v, tb):
|
|
self.release()
|
|
|
|
|
|
class BoundedSemaphore(Semaphore):
|
|
"""
|
|
BoundedSemaphore(value=1) -> BoundedSemaphore
|
|
|
|
A bounded semaphore checks to make sure its current value doesn't
|
|
exceed its initial value. If it does, :class:`ValueError` is
|
|
raised. In most situations semaphores are used to guard resources
|
|
with limited capacity. If the semaphore is released too many times
|
|
it's a sign of a bug.
|
|
|
|
If not given, *value* defaults to 1.
|
|
"""
|
|
|
|
#: For monkey-patching, allow changing the class of error we raise
|
|
_OVER_RELEASE_ERROR = ValueError
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
Semaphore.__init__(self, *args, **kwargs)
|
|
self._initial_value = self.counter
|
|
|
|
def release(self):
|
|
if self.counter >= self._initial_value:
|
|
raise self._OVER_RELEASE_ERROR("Semaphore released too many times")
|
|
Semaphore.release(self)
|
|
|
|
|
|
def _init():
|
|
greenlet_init() # pylint:disable=undefined-variable
|
|
|
|
_init()
|
|
|
|
# By building the semaphore with Cython under PyPy, we get
|
|
# atomic operations (specifically, exiting/releasing), at the
|
|
# cost of some speed (one trivial semaphore micro-benchmark put the pure-python version
|
|
# at around 1s and the compiled version at around 4s). Some clever subclassing
|
|
# and having only the bare minimum be in cython might help reduce that penalty.
|
|
# NOTE: You must use version 0.23.4 or later to avoid a memory leak.
|
|
# https://mail.python.org/pipermail/cython-devel/2015-October/004571.html
|
|
# However, that's all for naught on up to and including PyPy 4.0.1 which
|
|
# have some serious crashing bugs with GC interacting with cython.
|
|
# It hasn't been tested since then, and PURE_PYTHON is assumed to be true
|
|
# for PyPy in all cases anyway, so this does nothing.
|
|
|
|
from gevent._util import import_c_accel
|
|
import_c_accel(globals(), 'gevent.__semaphore')
|