Add type hints to utils

This commit is contained in:
kramo
2023-08-16 20:16:30 +02:00
parent eeb18eb017
commit 0d32414f1e
8 changed files with 75 additions and 49 deletions

View File

@@ -17,10 +17,18 @@
#
# SPDX-License-Identifier: GPL-3.0-or-later
from gi.repository import Adw
from typing import Optional
from gi.repository import Adw, Gtk
def create_dialog(win, heading, body, extra_option=None, extra_label=None):
def create_dialog(
win: Gtk.Window,
heading: str,
body: str,
extra_option: Optional[str] = None,
extra_label: Optional[str] = None,
) -> Adw.MessageDialog:
dialog = Adw.MessageDialog.new(win, heading, body)
dialog.add_response("dismiss", _("Dismiss"))

View File

@@ -30,7 +30,7 @@ old_games_dir = old_cartridges_data_dir / "games"
old_covers_dir = old_cartridges_data_dir / "covers"
def migrate_game_covers(game_path: Path):
def migrate_game_covers(game_path: Path) -> None:
"""Migrate a game covers from a source game path to the current dir"""
for suffix in (".tiff", ".gif"):
cover_path = old_covers_dir / game_path.with_suffix(suffix).name
@@ -41,7 +41,7 @@ def migrate_game_covers(game_path: Path):
cover_path.rename(destination_cover_path)
def migrate_files_v1_to_v2():
def migrate_files_v1_to_v2() -> None:
"""
Migrate user data from the v1.X locations to the latest location.

View File

@@ -17,11 +17,11 @@
#
# SPDX-License-Identifier: GPL-3.0-or-later
from typing import Optional, Sized
from threading import Lock, Thread, BoundedSemaphore
from time import sleep, time
from collections import deque
from contextlib import AbstractContextManager
from threading import BoundedSemaphore, Lock, Thread
from time import sleep, time
from typing import Any, Optional, Sized
class PickHistory(Sized):
@@ -30,22 +30,22 @@ class PickHistory(Sized):
period: int
timestamps: list[int] = None
timestamps_lock: Lock = None
timestamps: list[float]
timestamps_lock: Lock
def __init__(self, period: int) -> None:
self.period = period
self.timestamps = []
self.timestamps_lock = Lock()
def remove_old_entries(self):
def remove_old_entries(self) -> None:
"""Remove history entries older than the period"""
now = time()
cutoff = now - self.period
with self.timestamps_lock:
self.timestamps = [entry for entry in self.timestamps if entry > cutoff]
def add(self, *new_timestamps: Optional[int]):
def add(self, *new_timestamps: float) -> None:
"""Add timestamps to the history.
If none given, will add the current timestamp"""
if len(new_timestamps) == 0:
@@ -60,7 +60,7 @@ class PickHistory(Sized):
return len(self.timestamps)
@property
def start(self) -> int:
def start(self) -> float:
"""Get the time at which the history started"""
self.remove_old_entries()
with self.timestamps_lock:
@@ -70,7 +70,7 @@ class PickHistory(Sized):
entry = time()
return entry
def copy_timestamps(self) -> str:
def copy_timestamps(self) -> list[float]:
"""Get a copy of the timestamps history"""
self.remove_old_entries()
with self.timestamps_lock:
@@ -88,22 +88,22 @@ class RateLimiter(AbstractContextManager):
# Max number of tokens that can be consumed instantly
burst_tokens: int
pick_history: PickHistory = None
bucket: BoundedSemaphore = None
queue: deque[Lock] = None
queue_lock: Lock = None
pick_history: Optional[PickHistory] = None # TODO: Geoff: make this required
bucket: BoundedSemaphore
queue: deque[Lock]
queue_lock: Lock
# Protect the number of tokens behind a lock
__n_tokens_lock: Lock = None
__n_tokens_lock: Lock
__n_tokens = 0
@property
def n_tokens(self):
def n_tokens(self) -> int:
with self.__n_tokens_lock:
return self.__n_tokens
@n_tokens.setter
def n_tokens(self, value: int):
def n_tokens(self, value: int) -> None:
with self.__n_tokens_lock:
self.__n_tokens = value
@@ -147,8 +147,8 @@ class RateLimiter(AbstractContextManager):
"""
# Compute ideal spacing
tokens_left = self.refill_period_tokens - len(self.pick_history)
seconds_left = self.pick_history.start + self.refill_period_seconds - time()
tokens_left = self.refill_period_tokens - len(self.pick_history) # type: ignore
seconds_left = self.pick_history.start + self.refill_period_seconds - time() # type: ignore
try:
spacing_seconds = seconds_left / tokens_left
except ZeroDivisionError:
@@ -159,7 +159,7 @@ class RateLimiter(AbstractContextManager):
natural_spacing = self.refill_period_seconds / self.refill_period_tokens
return max(natural_spacing, spacing_seconds)
def refill(self):
def refill(self) -> None:
"""Add a token back in the bucket"""
sleep(self.refill_spacing)
try:
@@ -170,7 +170,7 @@ class RateLimiter(AbstractContextManager):
else:
self.n_tokens += 1
def refill_thread_func(self):
def refill_thread_func(self) -> None:
"""Entry point for the daemon thread that is refilling the bucket"""
while True:
self.refill()
@@ -200,18 +200,18 @@ class RateLimiter(AbstractContextManager):
self.queue.appendleft(lock)
return lock
def acquire(self):
def acquire(self) -> None:
"""Acquires a token from the bucket when it's your turn in queue"""
lock = self.add_to_queue()
self.update_queue()
# Wait until our turn in queue
lock.acquire() # pylint: disable=consider-using-with
self.pick_history.add()
self.pick_history.add() # type: ignore
# --- Support for use in with statements
def __enter__(self):
def __enter__(self) -> None:
self.acquire()
def __exit__(self, *_args):
def __exit__(self, *_args: Any) -> None:
pass

View File

@@ -18,11 +18,12 @@
# SPDX-License-Identifier: GPL-3.0-or-later
from datetime import datetime
from typing import Any
from gi.repository import GLib
def relative_date(timestamp): # pylint: disable=too-many-return-statements
def relative_date(timestamp: int) -> Any: # pylint: disable=too-many-return-statements
days_no = ((today := datetime.today()) - datetime.fromtimestamp(timestamp)).days
if days_no == 0:

View File

@@ -20,14 +20,17 @@
from pathlib import Path
from shutil import copyfile
from typing import Optional
from gi.repository import Gdk, Gio, GLib
from gi.repository import Gdk, GdkPixbuf, Gio, GLib
from PIL import Image, ImageSequence, UnidentifiedImageError
from src import shared
def resize_cover(cover_path=None, pixbuf=None):
def resize_cover(
cover_path: Optional[Path] = None, pixbuf: Optional[GdkPixbuf.Pixbuf] = None
) -> Optional[Path]:
if not cover_path and not pixbuf:
return None
@@ -74,7 +77,7 @@ def resize_cover(cover_path=None, pixbuf=None):
return tmp_path
def save_cover(game_id, cover_path):
def save_cover(game_id: str, cover_path: Path) -> None:
shared.covers_dir.mkdir(parents=True, exist_ok=True)
animated_path = shared.covers_dir / f"{game_id}.gif"

View File

@@ -21,6 +21,7 @@
import json
import logging
import re
from pathlib import Path
from typing import TypedDict
import requests
@@ -80,7 +81,7 @@ class SteamRateLimiter(RateLimiter):
self.pick_history.remove_old_entries()
super().__init__()
def acquire(self):
def acquire(self) -> None:
"""Get a token from the bucket and store the pick history in the schema"""
super().acquire()
timestamps_str = json.dumps(self.pick_history.copy_timestamps())
@@ -90,7 +91,9 @@ class SteamRateLimiter(RateLimiter):
class SteamFileHelper:
"""Helper for steam file formats"""
def get_manifest_data(self, manifest_path) -> SteamManifestData:
def get_manifest_data(
self, manifest_path: Path
) -> SteamManifestData: # TODO: Geoff: fix typing issue
"""Get local data for a game from its manifest"""
with open(manifest_path, "r", encoding="utf-8") as file:
@@ -116,7 +119,7 @@ class SteamAPIHelper:
def __init__(self, rate_limiter: RateLimiter) -> None:
self.rate_limiter = rate_limiter
def get_api_data(self, appid) -> SteamAPIData:
def get_api_data(self, appid: str) -> SteamAPIData:
"""
Get online data for a game from its appid.
May block to satisfy the Steam web API limitations.

View File

@@ -20,12 +20,14 @@
import logging
from pathlib import Path
from typing import Any
import requests
from gi.repository import Gio
from requests.exceptions import HTTPError
from src import shared
from src.game import Game
from src.utils.save_cover import resize_cover, save_cover
@@ -55,12 +57,12 @@ class SGDBHelper:
base_url = "https://www.steamgriddb.com/api/v2/"
@property
def auth_headers(self):
def auth_headers(self) -> dict[str, str]:
key = shared.schema.get_string("sgdb-key")
headers = {"Authorization": f"Bearer {key}"}
return headers
def get_game_id(self, game):
def get_game_id(self, game: Game) -> Any:
"""Get grid results for a game. Can raise an exception."""
uri = f"{self.base_url}search/autocomplete/{game.name}"
res = requests.get(uri, headers=self.auth_headers, timeout=5)
@@ -74,7 +76,7 @@ class SGDBHelper:
case _:
res.raise_for_status()
def get_image_uri(self, game_id, animated=False):
def get_image_uri(self, game_id: str, animated: bool = False) -> Any:
"""Get the image for a SGDB game id"""
uri = f"{self.base_url}grids/game/{game_id}?dimensions=600x900"
if animated:
@@ -93,7 +95,7 @@ class SGDBHelper:
case _:
res.raise_for_status()
def conditionaly_update_cover(self, game):
def conditionaly_update_cover(self, game: Game) -> None:
"""Update the game's cover if appropriate"""
# Obvious skips

View File

@@ -18,25 +18,28 @@
# SPDX-License-Identifier: GPL-3.0-or-later
from functools import wraps
from typing import Any, Callable
from gi.repository import Gio
def create_task_thread_func_closure(func, data):
def create_task_thread_func_closure(func: Callable, data: Any) -> Callable:
"""Wrap a Gio.TaskThreadFunc with the given data in a closure"""
def closure(task, source_object, _data, cancellable):
def closure(
task: Gio.Task, source_object: object, _data: Any, cancellable: Gio.Cancellable
) -> Any:
func(task, source_object, data, cancellable)
return closure
def decorate_set_task_data(task):
def decorate_set_task_data(task: Gio.Task) -> Callable:
"""Decorate Gio.Task.set_task_data to replace it"""
def decorator(original_method):
def decorator(original_method: Callable) -> Callable:
@wraps(original_method)
def new_method(task_data):
def new_method(task_data: Any) -> None:
task.task_data = task_data
return new_method
@@ -44,13 +47,13 @@ def decorate_set_task_data(task):
return decorator
def decorate_run_in_thread(task):
def decorate_run_in_thread(task: Gio.Task) -> Callable:
"""Decorate Gio.Task.run_in_thread to pass the task data correctly
Creates a closure around task_thread_func with the task data available."""
def decorator(original_method):
def decorator(original_method: Callable) -> Callable:
@wraps(original_method)
def new_method(task_thread_func):
def new_method(task_thread_func: Callable) -> None:
closure = create_task_thread_func_closure(task_thread_func, task.task_data)
original_method(closure)
@@ -64,11 +67,17 @@ class Task:
"""Wrapper around Gio.Task to patch task data not being passed"""
@classmethod
def new(cls, source_object, cancellable, callback, callback_data):
def new(
cls,
source_object: object,
cancellable: Gio.Cancellable,
callback: Callable,
callback_data: Any,
) -> Gio.Task:
"""Create a new, monkey-patched Gio.Task.
The `set_task_data` and `run_in_thread` methods are decorated.
As of 2023-05-19, pygobject does not work well with Gio.Task, so to pass data
As of 2023-05-19, PyGObject does not work well with Gio.Task, so to pass data
the only viable way it to create a closure with the thread function and its data.
This class is supposed to make Gio.Task comply with its expected behaviour
per the docs: