diff --git a/data/hu.kramo.Cartridges.gschema.xml.in b/data/hu.kramo.Cartridges.gschema.xml.in
index 0ae63b7..326bdf9 100644
--- a/data/hu.kramo.Cartridges.gschema.xml.in
+++ b/data/hu.kramo.Cartridges.gschema.xml.in
@@ -106,5 +106,11 @@
"a-z"
+
+ 200
+
+
+ 0
+
\ No newline at end of file
diff --git a/src/utils/rate_limiter.py b/src/utils/rate_limiter.py
index 280846b..b90f93d 100644
--- a/src/utils/rate_limiter.py
+++ b/src/utils/rate_limiter.py
@@ -1,114 +1,112 @@
-from threading import Lock, Thread
-from time import time_ns, sleep
+from typing import Optional
+from threading import Lock, Thread, BoundedSemaphore
+from time import sleep
from collections import deque
from contextlib import AbstractContextManager
-class RateLimiter(AbstractContextManager):
- """
- Thread-safe and blocking rate limiter.
+class TokenBucketRateLimiter(AbstractContextManager):
+ """Rate limiter implementing the token bucket algorithm"""
- There are at most X tokens available in the limiter, acquiring removes one
- and releasing gives back one.
-
- Acquire will block until those conditions are met:
- - There is a token available
- - At least Y nanoseconds have passed since the last token was acquired
-
- The order in which tokens are requested is the order in which they will be given.
- Works on a FIFO model.
-
- Can be used in a `with` statement like `threading.Lock`
- [Using locks, conditions, and semaphores in the with statement](https://docs.python.org/3/library/threading.html#using-locks-conditions-and-semaphores-in-the-with-statement)
- """
-
- # Number of tokens available in the limiter
- # = Max number of cuncurrent operations allowed
+ REFILL_SPACING_SECONDS: int
MAX_TOKENS: int
- available_tokens: int = 0
- tokens_lock: Lock = None
- # Minimum time elapsed between two token being distributed
- # = Rate limit
- PICK_SPACING_NS: int
- last_pick_time: int = 0
- last_pick_time_lock: Lock = None
-
- # Queue containing locks unlocked when a token can be acquired
- # Doesn't need a thread lock, deques have thread-safe append and pop on both ends
+ bucket: BoundedSemaphore = None
queue: deque[Lock] = None
+ queue_lock: Lock = None
- def __init__(self, pick_spacing_ns: int, max_tokens: int) -> None:
- self.PICK_SPACING_NS = pick_spacing_ns
- self.MAX_TOKENS = max_tokens
- self.last_pick_time = 0
- self.last_pick_time_lock = Lock()
- self.queue = deque()
- self.available_tokens = max_tokens
- self.tokens_lock = Lock()
+ # Protect the number of tokens behind a lock
+ __n_tokens_lock: Lock = None
+ __n_tokens = 0
+
+ @property
+ def n_tokens(self):
+ with self.__n_tokens_lock:
+ return self.__n_tokens
+
+ @n_tokens.setter
+ def n_tokens(self, value: int):
+ with self.__n_tokens_lock:
+ self.n_tokens = value
+
+ def __init__(
+ self,
+ refill_spacing_seconds: Optional[int] = None,
+ max_tokens: Optional[int] = None,
+ initial_tokens: Optional[int] = None,
+ ) -> None:
+ # Initialize default values
+ self.queue_lock = Lock()
+ if max_tokens is not None:
+ self.MAX_TOKENS = max_tokens
+ if refill_spacing_seconds is not None:
+ self.REFILL_SPACING_SECONDS = refill_spacing_seconds
+
+ # Initialize the bucket
+ self.bucket = BoundedSemaphore(self.MAX_TOKENS)
+ missing = 0 if initial_tokens is None else self.MAX_TOKENS - initial_tokens
+ missing = max(0, min(missing, max_tokens))
+ for _ in range(missing):
+ self.bucket.acquire()
+
+ # Initialize the counter
+ self.__n_tokens_lock = Lock()
+ self.n_tokens = self.MAX_TOKENS - missing
+
+ # Spawn daemon thread that refills the bucket
+ refill_thread = Thread(target=self.refill_thread_func, daemon=True)
+ refill_thread.start()
+
+ def refill(self):
+ """Method used by the refill thread"""
+ sleep(self.REFILL_SPACING_SECONDS)
+ try:
+ self.bucket.release()
+ except ValueError:
+ # Bucket was full
+ pass
+ else:
+ self.n_tokens += 1
+ self.update_queue()
+
+ def refill_thread_func(self):
+ """Entry point for the daemon thread that is refilling the bucket"""
+ while True:
+ self.refill()
def update_queue(self) -> None:
- """
- Move the queue forward if possible.
- Non-blocking, logic runs in a daemon thread.
- """
- thread = Thread(target=self.queue_update_thread_func, daemon=True)
- thread.start()
+ """Update the queue, moving it forward if possible. Non-blocking."""
+ update_thread = Thread(target=self.queue_update_thread_func, daemon=True)
+ update_thread.start()
def queue_update_thread_func(self) -> None:
"""Queue-updating thread's entry point"""
-
- # Consume a token, if none is available do nothing
- with self.tokens_lock:
- if self.available_tokens == 0:
+ with self.queue_lock:
+ if len(self.queue) == 0:
return
- self.available_tokens -= 1
-
- # Get the next lock in queue, if none is available do nothing
- try:
+ self.bucket.acquire()
+ self.n_tokens -= 1
lock = self.queue.pop()
- except IndexError:
- return
-
- # Satisfy the minimum pick spacing
- with self.last_pick_time_lock:
- elapsed = time_ns() - self.last_pick_time
- if (ns_to_sleep := self.PICK_SPACING_NS - elapsed) > 0:
- sleep(ns_to_sleep / 10**9)
- self.last_pick_time = time_ns()
-
- # Finally unlock the acquire call linked to that lock
- lock.release()
+ lock.release()
def add_to_queue(self) -> Lock:
"""Create a lock, add it to the queue and return it"""
lock = Lock()
lock.acquire()
- self.queue.appendleft(lock)
+ with self.queue_lock:
+ self.queue.appendleft(lock)
return lock
- def acquire(self) -> None:
- """Pick a token from the limiter"""
-
- # Wait our turn in queue
+ def acquire(self):
+ """Acquires a token from the bucket when it's your turn in queue"""
lock = self.add_to_queue()
self.update_queue()
-
- # Block until lock is released (= its turn in queue)
- # Single-use (this call to acquire), so no need to release it
lock.acquire()
- del lock
-
- def release(self) -> None:
- """Return a token to the limiter"""
- with self.tokens_lock:
- self.available_tokens += 1
- self.update_queue()
# --- Support for use in with statements
def __enter__(self):
self.acquire()
- def __exit__(self):
- self.release()
+ def __exit__(self, *_args):
+ pass
diff --git a/src/utils/steam.py b/src/utils/steam.py
index 4afa874..cdef804 100644
--- a/src/utils/steam.py
+++ b/src/utils/steam.py
@@ -1,9 +1,14 @@
-import re
import logging
+import re
+from time import time
from typing import TypedDict
+from math import floor, ceil
import requests
-from requests import HTTPError, JSONDecodeError
+from requests import HTTPError
+
+from src import shared
+from src.utils.rate_limiter import TokenBucketRateLimiter
class SteamError(Exception):
@@ -36,11 +41,40 @@ class SteamAPIData(TypedDict):
developers: str
+class SteamRateLimiter(TokenBucketRateLimiter):
+ """Rate limiter for the Steam web API"""
+
+ # Steam web API limit
+ # 200 requests per 5 min seems to be the limit
+ # https://stackoverflow.com/questions/76047820/how-am-i-exceeding-steam-apis-rate-limit
+ # https://stackoverflow.com/questions/51795457/avoiding-error-429-too-many-requests-steam-web-api
+ REFILL_SPACING_SECONDS = 1.5
+ MAX_TOKENS = 200
+
+ def __init__(self) -> None:
+ # Load initial tokens from schema
+ # (Remember API limits through restarts of Cartridges)
+ last_tokens = shared.state_schema.get_int("steam-api-tokens")
+ last_time = shared.state_schema.get_int("steam-api-tokens-timestamp")
+ produced = floor((time() - last_time) / self.REFILL_SPACING_SECONDS)
+ inital_tokens = last_tokens + produced
+ super().__init__(initial_tokens=inital_tokens)
+
+ def refill(self):
+ """Refill the bucket and store its number of tokens in the schema"""
+ super().refill()
+ shared.state_schema.set_int("steam-api-tokens-timestamp", ceil(time()))
+ shared.state_schema.set_int("steam-api-tokens", self.n_tokens)
+
+
class SteamHelper:
"""Helper around the Steam API"""
base_url = "https://store.steampowered.com/api"
+ # Shared across instances
+ rate_limiter: SteamRateLimiter = SteamRateLimiter()
+
def get_manifest_data(self, manifest_path) -> SteamManifestData:
"""Get local data for a game from its manifest"""
@@ -58,22 +92,30 @@ class SteamHelper:
return SteamManifestData(**data)
def get_api_data(self, appid) -> SteamAPIData:
- """Get online data for a game from its appid"""
- # TODO throttle to not get access denied
+ """
+ Get online data for a game from its appid.
- try:
- with requests.get(
- f"{self.base_url}/appdetails?appids={appid}", timeout=5
- ) as response:
- response.raise_for_status()
- data = response.json()[appid]
- except HTTPError as error:
- logging.warning("Steam API HTTP error for %s", appid, exc_info=error)
- raise error
+ May block to satisfy the Steam web API limitations.
+ """
+
+ # Get data from the API (way block to satisfy its limits)
+ with self.rate_limiter:
+ try:
+ with requests.get(
+ f"{self.base_url}/appdetails?appids={appid}", timeout=5
+ ) as response:
+ response.raise_for_status()
+ data = response.json()[appid]
+ except HTTPError as error:
+ logging.warning("Steam API HTTP error for %s", appid, exc_info=error)
+ raise error
+
+ # Handle not found
if not data["success"]:
logging.debug("Appid %s not found", appid)
raise SteamGameNotFoundError()
+ # Handle appid is not a game
game_types = ("game", "demo")
if data["data"]["type"] not in game_types:
logging.debug("Appid %s is not a game", appid)