🐛 Added rate limiter for Steam

This commit is contained in:
GeoffreyCoulaud
2023-06-03 16:31:15 +02:00
parent 6d6e830cc9
commit 58054f1c26
3 changed files with 141 additions and 95 deletions

View File

@@ -106,5 +106,11 @@
</choices>
<default>"a-z"</default>
</key>
<key name="steam-api-tokens" type="i">
<default>200</default>
</key>
<key name="steam-api-tokens-timestamp" type="i">
<default>0</default>
</key>
</schema>
</schemalist>

View File

@@ -1,114 +1,112 @@
from threading import Lock, Thread
from time import time_ns, sleep
from typing import Optional
from threading import Lock, Thread, BoundedSemaphore
from time import sleep
from collections import deque
from contextlib import AbstractContextManager
class RateLimiter(AbstractContextManager):
"""
Thread-safe and blocking rate limiter.
class TokenBucketRateLimiter(AbstractContextManager):
"""Rate limiter implementing the token bucket algorithm"""
There are at most X tokens available in the limiter, acquiring removes one
and releasing gives back one.
Acquire will block until those conditions are met:
- There is a token available
- At least Y nanoseconds have passed since the last token was acquired
The order in which tokens are requested is the order in which they will be given.
Works on a FIFO model.
Can be used in a `with` statement like `threading.Lock`
[Using locks, conditions, and semaphores in the with statement](https://docs.python.org/3/library/threading.html#using-locks-conditions-and-semaphores-in-the-with-statement)
"""
# Number of tokens available in the limiter
# = Max number of cuncurrent operations allowed
REFILL_SPACING_SECONDS: int
MAX_TOKENS: int
available_tokens: int = 0
tokens_lock: Lock = None
# Minimum time elapsed between two token being distributed
# = Rate limit
PICK_SPACING_NS: int
last_pick_time: int = 0
last_pick_time_lock: Lock = None
# Queue containing locks unlocked when a token can be acquired
# Doesn't need a thread lock, deques have thread-safe append and pop on both ends
bucket: BoundedSemaphore = None
queue: deque[Lock] = None
queue_lock: Lock = None
def __init__(self, pick_spacing_ns: int, max_tokens: int) -> None:
self.PICK_SPACING_NS = pick_spacing_ns
self.MAX_TOKENS = max_tokens
self.last_pick_time = 0
self.last_pick_time_lock = Lock()
self.queue = deque()
self.available_tokens = max_tokens
self.tokens_lock = Lock()
# Protect the number of tokens behind a lock
__n_tokens_lock: Lock = None
__n_tokens = 0
@property
def n_tokens(self):
with self.__n_tokens_lock:
return self.__n_tokens
@n_tokens.setter
def n_tokens(self, value: int):
with self.__n_tokens_lock:
self.n_tokens = value
def __init__(
self,
refill_spacing_seconds: Optional[int] = None,
max_tokens: Optional[int] = None,
initial_tokens: Optional[int] = None,
) -> None:
# Initialize default values
self.queue_lock = Lock()
if max_tokens is not None:
self.MAX_TOKENS = max_tokens
if refill_spacing_seconds is not None:
self.REFILL_SPACING_SECONDS = refill_spacing_seconds
# Initialize the bucket
self.bucket = BoundedSemaphore(self.MAX_TOKENS)
missing = 0 if initial_tokens is None else self.MAX_TOKENS - initial_tokens
missing = max(0, min(missing, max_tokens))
for _ in range(missing):
self.bucket.acquire()
# Initialize the counter
self.__n_tokens_lock = Lock()
self.n_tokens = self.MAX_TOKENS - missing
# Spawn daemon thread that refills the bucket
refill_thread = Thread(target=self.refill_thread_func, daemon=True)
refill_thread.start()
def refill(self):
"""Method used by the refill thread"""
sleep(self.REFILL_SPACING_SECONDS)
try:
self.bucket.release()
except ValueError:
# Bucket was full
pass
else:
self.n_tokens += 1
self.update_queue()
def refill_thread_func(self):
"""Entry point for the daemon thread that is refilling the bucket"""
while True:
self.refill()
def update_queue(self) -> None:
"""
Move the queue forward if possible.
Non-blocking, logic runs in a daemon thread.
"""
thread = Thread(target=self.queue_update_thread_func, daemon=True)
thread.start()
"""Update the queue, moving it forward if possible. Non-blocking."""
update_thread = Thread(target=self.queue_update_thread_func, daemon=True)
update_thread.start()
def queue_update_thread_func(self) -> None:
"""Queue-updating thread's entry point"""
# Consume a token, if none is available do nothing
with self.tokens_lock:
if self.available_tokens == 0:
with self.queue_lock:
if len(self.queue) == 0:
return
self.available_tokens -= 1
# Get the next lock in queue, if none is available do nothing
try:
self.bucket.acquire()
self.n_tokens -= 1
lock = self.queue.pop()
except IndexError:
return
# Satisfy the minimum pick spacing
with self.last_pick_time_lock:
elapsed = time_ns() - self.last_pick_time
if (ns_to_sleep := self.PICK_SPACING_NS - elapsed) > 0:
sleep(ns_to_sleep / 10**9)
self.last_pick_time = time_ns()
# Finally unlock the acquire call linked to that lock
lock.release()
lock.release()
def add_to_queue(self) -> Lock:
"""Create a lock, add it to the queue and return it"""
lock = Lock()
lock.acquire()
self.queue.appendleft(lock)
with self.queue_lock:
self.queue.appendleft(lock)
return lock
def acquire(self) -> None:
"""Pick a token from the limiter"""
# Wait our turn in queue
def acquire(self):
"""Acquires a token from the bucket when it's your turn in queue"""
lock = self.add_to_queue()
self.update_queue()
# Block until lock is released (= its turn in queue)
# Single-use (this call to acquire), so no need to release it
lock.acquire()
del lock
def release(self) -> None:
"""Return a token to the limiter"""
with self.tokens_lock:
self.available_tokens += 1
self.update_queue()
# --- Support for use in with statements
def __enter__(self):
self.acquire()
def __exit__(self):
self.release()
def __exit__(self, *_args):
pass

View File

@@ -1,9 +1,14 @@
import re
import logging
import re
from time import time
from typing import TypedDict
from math import floor, ceil
import requests
from requests import HTTPError, JSONDecodeError
from requests import HTTPError
from src import shared
from src.utils.rate_limiter import TokenBucketRateLimiter
class SteamError(Exception):
@@ -36,11 +41,40 @@ class SteamAPIData(TypedDict):
developers: str
class SteamRateLimiter(TokenBucketRateLimiter):
"""Rate limiter for the Steam web API"""
# Steam web API limit
# 200 requests per 5 min seems to be the limit
# https://stackoverflow.com/questions/76047820/how-am-i-exceeding-steam-apis-rate-limit
# https://stackoverflow.com/questions/51795457/avoiding-error-429-too-many-requests-steam-web-api
REFILL_SPACING_SECONDS = 1.5
MAX_TOKENS = 200
def __init__(self) -> None:
# Load initial tokens from schema
# (Remember API limits through restarts of Cartridges)
last_tokens = shared.state_schema.get_int("steam-api-tokens")
last_time = shared.state_schema.get_int("steam-api-tokens-timestamp")
produced = floor((time() - last_time) / self.REFILL_SPACING_SECONDS)
inital_tokens = last_tokens + produced
super().__init__(initial_tokens=inital_tokens)
def refill(self):
"""Refill the bucket and store its number of tokens in the schema"""
super().refill()
shared.state_schema.set_int("steam-api-tokens-timestamp", ceil(time()))
shared.state_schema.set_int("steam-api-tokens", self.n_tokens)
class SteamHelper:
"""Helper around the Steam API"""
base_url = "https://store.steampowered.com/api"
# Shared across instances
rate_limiter: SteamRateLimiter = SteamRateLimiter()
def get_manifest_data(self, manifest_path) -> SteamManifestData:
"""Get local data for a game from its manifest"""
@@ -58,22 +92,30 @@ class SteamHelper:
return SteamManifestData(**data)
def get_api_data(self, appid) -> SteamAPIData:
"""Get online data for a game from its appid"""
# TODO throttle to not get access denied
"""
Get online data for a game from its appid.
try:
with requests.get(
f"{self.base_url}/appdetails?appids={appid}", timeout=5
) as response:
response.raise_for_status()
data = response.json()[appid]
except HTTPError as error:
logging.warning("Steam API HTTP error for %s", appid, exc_info=error)
raise error
May block to satisfy the Steam web API limitations.
"""
# Get data from the API (way block to satisfy its limits)
with self.rate_limiter:
try:
with requests.get(
f"{self.base_url}/appdetails?appids={appid}", timeout=5
) as response:
response.raise_for_status()
data = response.json()[appid]
except HTTPError as error:
logging.warning("Steam API HTTP error for %s", appid, exc_info=error)
raise error
# Handle not found
if not data["success"]:
logging.debug("Appid %s not found", appid)
raise SteamGameNotFoundError()
# Handle appid is not a game
game_types = ("game", "demo")
if data["data"]["type"] not in game_types:
logging.debug("Appid %s is not a game", appid)