Merge pull request #169 from kra-mo/typing-improvements

Typing improvements
This commit is contained in:
Geoffrey Coulaud
2023-08-17 01:29:39 +02:00
committed by GitHub
2 changed files with 39 additions and 31 deletions

View File

@@ -21,7 +21,7 @@ from collections import deque
from contextlib import AbstractContextManager from contextlib import AbstractContextManager
from threading import BoundedSemaphore, Lock, Thread from threading import BoundedSemaphore, Lock, Thread
from time import sleep, time from time import sleep, time
from typing import Any, Optional, Sized from typing import Any, Sized
class PickHistory(Sized): class PickHistory(Sized):
@@ -79,16 +79,23 @@ class PickHistory(Sized):
# pylint: disable=too-many-instance-attributes # pylint: disable=too-many-instance-attributes
class RateLimiter(AbstractContextManager): class RateLimiter(AbstractContextManager):
"""Rate limiter implementing the token bucket algorithm""" """
Base rate limiter implementing the token bucket algorithm.
Do not use directly, create a child class to tailor the rate limiting to the
underlying service's limits.
Subclasses must provide values to the following attributes:
* refill_period_seconds - Period in which we have a max amount of tokens
* refill_period_tokens - Number of tokens allowed in this period
* burst_tokens - Max number of tokens that can be consumed instantly
"""
# Period in which we have a max amount of tokens
refill_period_seconds: int refill_period_seconds: int
# Number of tokens allowed in this period
refill_period_tokens: int refill_period_tokens: int
# Max number of tokens that can be consumed instantly
burst_tokens: int burst_tokens: int
pick_history: Optional[PickHistory] = None # TODO: Geoff: make this required pick_history: PickHistory
bucket: BoundedSemaphore bucket: BoundedSemaphore
queue: deque[Lock] queue: deque[Lock]
queue_lock: Lock queue_lock: Lock
@@ -107,23 +114,20 @@ class RateLimiter(AbstractContextManager):
with self.__n_tokens_lock: with self.__n_tokens_lock:
self.__n_tokens = value self.__n_tokens = value
def __init__( def _init_pick_history(self) -> None:
self, """
refill_period_seconds: Optional[int] = None, Initialize the tocken pick history
refill_period_tokens: Optional[int] = None, (only for use in this class and its children)
burst_tokens: Optional[int] = None,
) -> None: By default, creates an empty pick history.
Should be overriden or extended by subclasses.
"""
self.pick_history = PickHistory(self.refill_period_seconds)
def __init__(self) -> None:
"""Initialize the limiter""" """Initialize the limiter"""
# Initialize default values self._init_pick_history()
if refill_period_seconds is not None:
self.refill_period_seconds = refill_period_seconds
if refill_period_tokens is not None:
self.refill_period_tokens = refill_period_tokens
if burst_tokens is not None:
self.burst_tokens = burst_tokens
if self.pick_history is None:
self.pick_history = PickHistory(self.refill_period_seconds)
# Create synchronization data # Create synchronization data
self.__n_tokens_lock = Lock() self.__n_tokens_lock = Lock()

View File

@@ -28,7 +28,7 @@ import requests
from requests.exceptions import HTTPError from requests.exceptions import HTTPError
from src import shared from src import shared
from src.utils.rate_limiter import PickHistory, RateLimiter from src.utils.rate_limiter import RateLimiter
class SteamError(Exception): class SteamError(Exception):
@@ -72,14 +72,16 @@ class SteamRateLimiter(RateLimiter):
refill_period_tokens = 200 refill_period_tokens = 200
burst_tokens = 100 burst_tokens = 100
def __init__(self) -> None: def _init_pick_history(self) -> None:
# Load pick history from schema """
# (Remember API limits through restarts of Cartridges) Load the pick history from schema.
Allows remembering API limits through restarts of Cartridges.
"""
super()._init_pick_history()
timestamps_str = shared.state_schema.get_string("steam-limiter-tokens-history") timestamps_str = shared.state_schema.get_string("steam-limiter-tokens-history")
self.pick_history = PickHistory(self.refill_period_seconds)
self.pick_history.add(*json.loads(timestamps_str)) self.pick_history.add(*json.loads(timestamps_str))
self.pick_history.remove_old_entries() self.pick_history.remove_old_entries()
super().__init__()
def acquire(self) -> None: def acquire(self) -> None:
"""Get a token from the bucket and store the pick history in the schema""" """Get a token from the bucket and store the pick history in the schema"""
@@ -91,9 +93,7 @@ class SteamRateLimiter(RateLimiter):
class SteamFileHelper: class SteamFileHelper:
"""Helper for steam file formats""" """Helper for steam file formats"""
def get_manifest_data( def get_manifest_data(self, manifest_path: Path) -> SteamManifestData:
self, manifest_path: Path
) -> SteamManifestData: # TODO: Geoff: fix typing issue
"""Get local data for a game from its manifest""" """Get local data for a game from its manifest"""
with open(manifest_path, "r", encoding="utf-8") as file: with open(manifest_path, "r", encoding="utf-8") as file:
@@ -107,7 +107,11 @@ class SteamFileHelper:
raise SteamInvalidManifestError() raise SteamInvalidManifestError()
data[key] = match.group(1) data[key] = match.group(1)
return SteamManifestData(**data) return SteamManifestData(
name=data["name"],
appid=data["appid"],
stateflags=data["stateflags"],
)
class SteamAPIHelper: class SteamAPIHelper: