diff --git a/src/utils/rate_limiter.py b/src/utils/rate_limiter.py index 43e1150..280846b 100644 --- a/src/utils/rate_limiter.py +++ b/src/utils/rate_limiter.py @@ -1,35 +1,51 @@ from threading import Lock, Thread from time import time_ns, sleep +from collections import deque +from contextlib import AbstractContextManager -class RateLimiter: +class RateLimiter(AbstractContextManager): """ Thread-safe and blocking rate limiter. - * There are at most X tokens available in the limiter - * Tokens can't be picked faster than every Y nanoseconds - * Acquire will block until those conditions are met - * The first to request a token will also be the first to acquire one + + There are at most X tokens available in the limiter, acquiring removes one + and releasing gives back one. + + Acquire will block until those conditions are met: + - There is a token available + - At least Y nanoseconds have passed since the last token was acquired + + The order in which tokens are requested is the order in which they will be given. + Works on a FIFO model. + + Can be used in a `with` statement like `threading.Lock` + [Using locks, conditions, and semaphores in the with statement](https://docs.python.org/3/library/threading.html#using-locks-conditions-and-semaphores-in-the-with-statement) """ - PICK_SPACING_NS: float + # Number of tokens available in the limiter + # = Max number of cuncurrent operations allowed MAX_TOKENS: int + available_tokens: int = 0 + tokens_lock: Lock = None - _last_pick_time: int = 0 - _n_tokens: int = 0 - _queue: list[Lock] = None - _queue_lock: Lock = None - _tokens_lock: Lock = None - _last_pick_time_lock: Lock = None + # Minimum time elapsed between two token being distributed + # = Rate limit + PICK_SPACING_NS: int + last_pick_time: int = 0 + last_pick_time_lock: Lock = None - def __init__(self, pick_spacing_ns: float, max_tokens: int) -> None: + # Queue containing locks unlocked when a token can be acquired + # Doesn't need a thread lock, deques have thread-safe append and pop on both ends + queue: deque[Lock] = None + + def __init__(self, pick_spacing_ns: int, max_tokens: int) -> None: self.PICK_SPACING_NS = pick_spacing_ns self.MAX_TOKENS = max_tokens - self._last_pick_time = 0 - self._last_pick_time_lock = Lock() - self._queue = [] - self._queue_lock = Lock() - self._n_tokens = max_tokens - self._tokens_lock = Lock() + self.last_pick_time = 0 + self.last_pick_time_lock = Lock() + self.queue = deque() + self.available_tokens = max_tokens + self.tokens_lock = Lock() def update_queue(self) -> None: """ @@ -41,50 +57,58 @@ class RateLimiter: def queue_update_thread_func(self) -> None: """Queue-updating thread's entry point""" - with self._queue_lock, self._tokens_lock: - # Unlock as many locks in the queue as there are tokens available - n_unlocked = min(len(self._queue), self._n_tokens) - for _ in range(n_unlocked): - lock = self._queue.pop(0) - lock.release() - # Consume the tokens used - self._n_tokens -= n_unlocked + + # Consume a token, if none is available do nothing + with self.tokens_lock: + if self.available_tokens == 0: + return + self.available_tokens -= 1 + + # Get the next lock in queue, if none is available do nothing + try: + lock = self.queue.pop() + except IndexError: + return + + # Satisfy the minimum pick spacing + with self.last_pick_time_lock: + elapsed = time_ns() - self.last_pick_time + if (ns_to_sleep := self.PICK_SPACING_NS - elapsed) > 0: + sleep(ns_to_sleep / 10**9) + self.last_pick_time = time_ns() + + # Finally unlock the acquire call linked to that lock + lock.release() def add_to_queue(self) -> Lock: """Create a lock, add it to the queue and return it""" lock = Lock() lock.acquire() - with self._queue_lock: - self._queue.append(lock) + self.queue.appendleft(lock) return lock def acquire(self) -> None: - """ - Pick a token from the limiter. - Will block: - * Until your turn in queue - * Until the minimum pick spacing is satified - """ + """Pick a token from the limiter""" # Wait our turn in queue - # (no need for with since queue locks are unique, will be destroyed after that) lock = self.add_to_queue() self.update_queue() - lock.acquire() - # TODO move to queue unlock (else order is not ensured) - # Satisfy the minimum pick spacing - now = time_ns() - with self._last_pick_time_lock: - elapsed = now - self._last_pick_time - ns_to_sleep = self.PICK_SPACING_NS - elapsed - self._last_pick_time = now - if ns_to_sleep > 0: - sleep(ns_to_sleep / 10**9) - self._last_pick_time += ns_to_sleep + # Block until lock is released (= its turn in queue) + # Single-use (this call to acquire), so no need to release it + lock.acquire() + del lock def release(self) -> None: - """Return a token to the bucket""" - with self._tokens_lock: - self._n_tokens += 1 + """Return a token to the limiter""" + with self.tokens_lock: + self.available_tokens += 1 self.update_queue() + + # --- Support for use in with statements + + def __enter__(self): + self.acquire() + + def __exit__(self): + self.release()