🚧 Intial work on a generic rate limiter

This commit is contained in:
GeoffreyCoulaud
2023-06-03 14:18:26 +02:00
parent 10a635fc78
commit 6d6e830cc9

View File

@@ -1,35 +1,51 @@
from threading import Lock, Thread from threading import Lock, Thread
from time import time_ns, sleep from time import time_ns, sleep
from collections import deque
from contextlib import AbstractContextManager
class RateLimiter: class RateLimiter(AbstractContextManager):
""" """
Thread-safe and blocking rate limiter. Thread-safe and blocking rate limiter.
* There are at most X tokens available in the limiter
* Tokens can't be picked faster than every Y nanoseconds There are at most X tokens available in the limiter, acquiring removes one
* Acquire will block until those conditions are met and releasing gives back one.
* The first to request a token will also be the first to acquire one
Acquire will block until those conditions are met:
- There is a token available
- At least Y nanoseconds have passed since the last token was acquired
The order in which tokens are requested is the order in which they will be given.
Works on a FIFO model.
Can be used in a `with` statement like `threading.Lock`
[Using locks, conditions, and semaphores in the with statement](https://docs.python.org/3/library/threading.html#using-locks-conditions-and-semaphores-in-the-with-statement)
""" """
PICK_SPACING_NS: float # Number of tokens available in the limiter
# = Max number of cuncurrent operations allowed
MAX_TOKENS: int MAX_TOKENS: int
available_tokens: int = 0
tokens_lock: Lock = None
_last_pick_time: int = 0 # Minimum time elapsed between two token being distributed
_n_tokens: int = 0 # = Rate limit
_queue: list[Lock] = None PICK_SPACING_NS: int
_queue_lock: Lock = None last_pick_time: int = 0
_tokens_lock: Lock = None last_pick_time_lock: Lock = None
_last_pick_time_lock: Lock = None
def __init__(self, pick_spacing_ns: float, max_tokens: int) -> None: # Queue containing locks unlocked when a token can be acquired
# Doesn't need a thread lock, deques have thread-safe append and pop on both ends
queue: deque[Lock] = None
def __init__(self, pick_spacing_ns: int, max_tokens: int) -> None:
self.PICK_SPACING_NS = pick_spacing_ns self.PICK_SPACING_NS = pick_spacing_ns
self.MAX_TOKENS = max_tokens self.MAX_TOKENS = max_tokens
self._last_pick_time = 0 self.last_pick_time = 0
self._last_pick_time_lock = Lock() self.last_pick_time_lock = Lock()
self._queue = [] self.queue = deque()
self._queue_lock = Lock() self.available_tokens = max_tokens
self._n_tokens = max_tokens self.tokens_lock = Lock()
self._tokens_lock = Lock()
def update_queue(self) -> None: def update_queue(self) -> None:
""" """
@@ -41,50 +57,58 @@ class RateLimiter:
def queue_update_thread_func(self) -> None: def queue_update_thread_func(self) -> None:
"""Queue-updating thread's entry point""" """Queue-updating thread's entry point"""
with self._queue_lock, self._tokens_lock:
# Unlock as many locks in the queue as there are tokens available # Consume a token, if none is available do nothing
n_unlocked = min(len(self._queue), self._n_tokens) with self.tokens_lock:
for _ in range(n_unlocked): if self.available_tokens == 0:
lock = self._queue.pop(0) return
self.available_tokens -= 1
# Get the next lock in queue, if none is available do nothing
try:
lock = self.queue.pop()
except IndexError:
return
# Satisfy the minimum pick spacing
with self.last_pick_time_lock:
elapsed = time_ns() - self.last_pick_time
if (ns_to_sleep := self.PICK_SPACING_NS - elapsed) > 0:
sleep(ns_to_sleep / 10**9)
self.last_pick_time = time_ns()
# Finally unlock the acquire call linked to that lock
lock.release() lock.release()
# Consume the tokens used
self._n_tokens -= n_unlocked
def add_to_queue(self) -> Lock: def add_to_queue(self) -> Lock:
"""Create a lock, add it to the queue and return it""" """Create a lock, add it to the queue and return it"""
lock = Lock() lock = Lock()
lock.acquire() lock.acquire()
with self._queue_lock: self.queue.appendleft(lock)
self._queue.append(lock)
return lock return lock
def acquire(self) -> None: def acquire(self) -> None:
""" """Pick a token from the limiter"""
Pick a token from the limiter.
Will block:
* Until your turn in queue
* Until the minimum pick spacing is satified
"""
# Wait our turn in queue # Wait our turn in queue
# (no need for with since queue locks are unique, will be destroyed after that)
lock = self.add_to_queue() lock = self.add_to_queue()
self.update_queue() self.update_queue()
lock.acquire()
# TODO move to queue unlock (else order is not ensured) # Block until lock is released (= its turn in queue)
# Satisfy the minimum pick spacing # Single-use (this call to acquire), so no need to release it
now = time_ns() lock.acquire()
with self._last_pick_time_lock: del lock
elapsed = now - self._last_pick_time
ns_to_sleep = self.PICK_SPACING_NS - elapsed
self._last_pick_time = now
if ns_to_sleep > 0:
sleep(ns_to_sleep / 10**9)
self._last_pick_time += ns_to_sleep
def release(self) -> None: def release(self) -> None:
"""Return a token to the bucket""" """Return a token to the limiter"""
with self._tokens_lock: with self.tokens_lock:
self._n_tokens += 1 self.available_tokens += 1
self.update_queue() self.update_queue()
# --- Support for use in with statements
def __enter__(self):
self.acquire()
def __exit__(self):
self.release()