From 58054f1c26f6c424cfad5ebbabe6dc185185dca9 Mon Sep 17 00:00:00 2001 From: GeoffreyCoulaud Date: Sat, 3 Jun 2023 16:31:15 +0200 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20Added=20rate=20limiter=20for=20S?= =?UTF-8?q?team?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- data/hu.kramo.Cartridges.gschema.xml.in | 6 + src/utils/rate_limiter.py | 162 ++++++++++++------------ src/utils/steam.py | 68 ++++++++-- 3 files changed, 141 insertions(+), 95 deletions(-) diff --git a/data/hu.kramo.Cartridges.gschema.xml.in b/data/hu.kramo.Cartridges.gschema.xml.in index 0ae63b7..326bdf9 100644 --- a/data/hu.kramo.Cartridges.gschema.xml.in +++ b/data/hu.kramo.Cartridges.gschema.xml.in @@ -106,5 +106,11 @@ "a-z" + + 200 + + + 0 + \ No newline at end of file diff --git a/src/utils/rate_limiter.py b/src/utils/rate_limiter.py index 280846b..b90f93d 100644 --- a/src/utils/rate_limiter.py +++ b/src/utils/rate_limiter.py @@ -1,114 +1,112 @@ -from threading import Lock, Thread -from time import time_ns, sleep +from typing import Optional +from threading import Lock, Thread, BoundedSemaphore +from time import sleep from collections import deque from contextlib import AbstractContextManager -class RateLimiter(AbstractContextManager): - """ - Thread-safe and blocking rate limiter. +class TokenBucketRateLimiter(AbstractContextManager): + """Rate limiter implementing the token bucket algorithm""" - There are at most X tokens available in the limiter, acquiring removes one - and releasing gives back one. - - Acquire will block until those conditions are met: - - There is a token available - - At least Y nanoseconds have passed since the last token was acquired - - The order in which tokens are requested is the order in which they will be given. - Works on a FIFO model. - - Can be used in a `with` statement like `threading.Lock` - [Using locks, conditions, and semaphores in the with statement](https://docs.python.org/3/library/threading.html#using-locks-conditions-and-semaphores-in-the-with-statement) - """ - - # Number of tokens available in the limiter - # = Max number of cuncurrent operations allowed + REFILL_SPACING_SECONDS: int MAX_TOKENS: int - available_tokens: int = 0 - tokens_lock: Lock = None - # Minimum time elapsed between two token being distributed - # = Rate limit - PICK_SPACING_NS: int - last_pick_time: int = 0 - last_pick_time_lock: Lock = None - - # Queue containing locks unlocked when a token can be acquired - # Doesn't need a thread lock, deques have thread-safe append and pop on both ends + bucket: BoundedSemaphore = None queue: deque[Lock] = None + queue_lock: Lock = None - def __init__(self, pick_spacing_ns: int, max_tokens: int) -> None: - self.PICK_SPACING_NS = pick_spacing_ns - self.MAX_TOKENS = max_tokens - self.last_pick_time = 0 - self.last_pick_time_lock = Lock() - self.queue = deque() - self.available_tokens = max_tokens - self.tokens_lock = Lock() + # Protect the number of tokens behind a lock + __n_tokens_lock: Lock = None + __n_tokens = 0 + + @property + def n_tokens(self): + with self.__n_tokens_lock: + return self.__n_tokens + + @n_tokens.setter + def n_tokens(self, value: int): + with self.__n_tokens_lock: + self.n_tokens = value + + def __init__( + self, + refill_spacing_seconds: Optional[int] = None, + max_tokens: Optional[int] = None, + initial_tokens: Optional[int] = None, + ) -> None: + # Initialize default values + self.queue_lock = Lock() + if max_tokens is not None: + self.MAX_TOKENS = max_tokens + if refill_spacing_seconds is not None: + self.REFILL_SPACING_SECONDS = refill_spacing_seconds + + # Initialize the bucket + self.bucket = BoundedSemaphore(self.MAX_TOKENS) + missing = 0 if initial_tokens is None else self.MAX_TOKENS - initial_tokens + missing = max(0, min(missing, max_tokens)) + for _ in range(missing): + self.bucket.acquire() + + # Initialize the counter + self.__n_tokens_lock = Lock() + self.n_tokens = self.MAX_TOKENS - missing + + # Spawn daemon thread that refills the bucket + refill_thread = Thread(target=self.refill_thread_func, daemon=True) + refill_thread.start() + + def refill(self): + """Method used by the refill thread""" + sleep(self.REFILL_SPACING_SECONDS) + try: + self.bucket.release() + except ValueError: + # Bucket was full + pass + else: + self.n_tokens += 1 + self.update_queue() + + def refill_thread_func(self): + """Entry point for the daemon thread that is refilling the bucket""" + while True: + self.refill() def update_queue(self) -> None: - """ - Move the queue forward if possible. - Non-blocking, logic runs in a daemon thread. - """ - thread = Thread(target=self.queue_update_thread_func, daemon=True) - thread.start() + """Update the queue, moving it forward if possible. Non-blocking.""" + update_thread = Thread(target=self.queue_update_thread_func, daemon=True) + update_thread.start() def queue_update_thread_func(self) -> None: """Queue-updating thread's entry point""" - - # Consume a token, if none is available do nothing - with self.tokens_lock: - if self.available_tokens == 0: + with self.queue_lock: + if len(self.queue) == 0: return - self.available_tokens -= 1 - - # Get the next lock in queue, if none is available do nothing - try: + self.bucket.acquire() + self.n_tokens -= 1 lock = self.queue.pop() - except IndexError: - return - - # Satisfy the minimum pick spacing - with self.last_pick_time_lock: - elapsed = time_ns() - self.last_pick_time - if (ns_to_sleep := self.PICK_SPACING_NS - elapsed) > 0: - sleep(ns_to_sleep / 10**9) - self.last_pick_time = time_ns() - - # Finally unlock the acquire call linked to that lock - lock.release() + lock.release() def add_to_queue(self) -> Lock: """Create a lock, add it to the queue and return it""" lock = Lock() lock.acquire() - self.queue.appendleft(lock) + with self.queue_lock: + self.queue.appendleft(lock) return lock - def acquire(self) -> None: - """Pick a token from the limiter""" - - # Wait our turn in queue + def acquire(self): + """Acquires a token from the bucket when it's your turn in queue""" lock = self.add_to_queue() self.update_queue() - - # Block until lock is released (= its turn in queue) - # Single-use (this call to acquire), so no need to release it lock.acquire() - del lock - - def release(self) -> None: - """Return a token to the limiter""" - with self.tokens_lock: - self.available_tokens += 1 - self.update_queue() # --- Support for use in with statements def __enter__(self): self.acquire() - def __exit__(self): - self.release() + def __exit__(self, *_args): + pass diff --git a/src/utils/steam.py b/src/utils/steam.py index 4afa874..cdef804 100644 --- a/src/utils/steam.py +++ b/src/utils/steam.py @@ -1,9 +1,14 @@ -import re import logging +import re +from time import time from typing import TypedDict +from math import floor, ceil import requests -from requests import HTTPError, JSONDecodeError +from requests import HTTPError + +from src import shared +from src.utils.rate_limiter import TokenBucketRateLimiter class SteamError(Exception): @@ -36,11 +41,40 @@ class SteamAPIData(TypedDict): developers: str +class SteamRateLimiter(TokenBucketRateLimiter): + """Rate limiter for the Steam web API""" + + # Steam web API limit + # 200 requests per 5 min seems to be the limit + # https://stackoverflow.com/questions/76047820/how-am-i-exceeding-steam-apis-rate-limit + # https://stackoverflow.com/questions/51795457/avoiding-error-429-too-many-requests-steam-web-api + REFILL_SPACING_SECONDS = 1.5 + MAX_TOKENS = 200 + + def __init__(self) -> None: + # Load initial tokens from schema + # (Remember API limits through restarts of Cartridges) + last_tokens = shared.state_schema.get_int("steam-api-tokens") + last_time = shared.state_schema.get_int("steam-api-tokens-timestamp") + produced = floor((time() - last_time) / self.REFILL_SPACING_SECONDS) + inital_tokens = last_tokens + produced + super().__init__(initial_tokens=inital_tokens) + + def refill(self): + """Refill the bucket and store its number of tokens in the schema""" + super().refill() + shared.state_schema.set_int("steam-api-tokens-timestamp", ceil(time())) + shared.state_schema.set_int("steam-api-tokens", self.n_tokens) + + class SteamHelper: """Helper around the Steam API""" base_url = "https://store.steampowered.com/api" + # Shared across instances + rate_limiter: SteamRateLimiter = SteamRateLimiter() + def get_manifest_data(self, manifest_path) -> SteamManifestData: """Get local data for a game from its manifest""" @@ -58,22 +92,30 @@ class SteamHelper: return SteamManifestData(**data) def get_api_data(self, appid) -> SteamAPIData: - """Get online data for a game from its appid""" - # TODO throttle to not get access denied + """ + Get online data for a game from its appid. - try: - with requests.get( - f"{self.base_url}/appdetails?appids={appid}", timeout=5 - ) as response: - response.raise_for_status() - data = response.json()[appid] - except HTTPError as error: - logging.warning("Steam API HTTP error for %s", appid, exc_info=error) - raise error + May block to satisfy the Steam web API limitations. + """ + + # Get data from the API (way block to satisfy its limits) + with self.rate_limiter: + try: + with requests.get( + f"{self.base_url}/appdetails?appids={appid}", timeout=5 + ) as response: + response.raise_for_status() + data = response.json()[appid] + except HTTPError as error: + logging.warning("Steam API HTTP error for %s", appid, exc_info=error) + raise error + + # Handle not found if not data["success"]: logging.debug("Appid %s not found", appid) raise SteamGameNotFoundError() + # Handle appid is not a game game_types = ("game", "demo") if data["data"]["type"] not in game_types: logging.debug("Appid %s is not a game", appid)