🎨 Fixed some linter warnings

Applied suggested pylint fixes and suppressed unhelpful pylint messages
This commit is contained in:
GeoffreyCoulaud
2023-06-10 03:30:09 +02:00
parent eeb0f3e501
commit c9a96f5eec
8 changed files with 38 additions and 162 deletions

View File

@@ -110,6 +110,7 @@ class DetailsWindow(Adw.Window):
file_path = _("/path/to/{}").format(file_name)
command = "xdg-open"
# pylint: disable=line-too-long
exec_info_text = _(
'To launch the executable "{}", use the command:\n\n<tt>"{}"</tt>\n\nTo open the file "{}" with the default application, use:\n\n<tt>{} "{}"</tt>\n\nIf the path contains spaces, make sure to wrap it in double quotes!'
).format(exe_name, exe_path, file_name, command, file_path)

View File

@@ -31,6 +31,7 @@ from src import shared
from src.game_cover import GameCover
# pylint: disable=too-many-instance-attributes
@Gtk.Template(resource_path=shared.PREFIX + "/gtk/game.ui")
class Game(Gtk.Box):
__gtype_name__ = "Game"
@@ -187,6 +188,7 @@ class Game(Gtk.Box):
)
logging.info("Starting %s: %s", self.name, str(args))
# pylint: disable=consider-using-with
subprocess.Popen(
args,
cwd=Path.home(),

View File

@@ -1,6 +1,6 @@
import logging
from gi.repository import Adw, Gio, Gtk
from gi.repository import Adw, Gtk
from src import shared
from src.game import Game
@@ -31,15 +31,13 @@ class Importer:
@property
def n_games_added(self):
return sum(
[
1 if not (pipeline.game.blacklisted or pipeline.game.removed) else 0
for pipeline in self.game_pipelines
]
1 if not (pipeline.game.blacklisted or pipeline.game.removed) else 0
for pipeline in self.game_pipelines
)
@property
def pipelines_progress(self):
progress = sum([pipeline.progress for pipeline in self.game_pipelines])
progress = sum(pipeline.progress for pipeline in self.game_pipelines)
try:
progress = progress / len(self.game_pipelines)
except ZeroDivisionError:
@@ -126,7 +124,7 @@ class Importer:
else:
# Warn source implementers that an invalid type was produced
# Should not happen on production code
logging.warn(
logging.warning(
"%s produced an invalid iteration return type %s",
source.id,
type(iteration_result),

View File

@@ -11,6 +11,9 @@ from src.store.managers.manager import Manager
from src.utils.save_cover import resize_cover, save_cover
# TODO Remove by generalizing OnlineCoverManager
class ItchCoverManager(Manager):
"""Manager in charge of downloading the game's cover from itch.io"""

View File

@@ -73,7 +73,7 @@ class Manager:
if error in self.continue_on:
# Handle skippable errors (skip silently)
return
elif error in self.retryable_on:
if error in self.retryable_on:
if try_index < self.max_tries:
# Handle retryable errors
logging.error("Retrying %s in %s for %s", *logging_args)

View File

@@ -1,132 +0,0 @@
# importer.py
#
# Copyright 2022-2023 kramo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# SPDX-License-Identifier: GPL-3.0-or-later
from gi.repository import Adw, GLib, Gtk
from src import shared
from .create_dialog import create_dialog
from .game import Game
from .save_cover import resize_cover, save_cover
from .steamgriddb import SGDBSave
class Importer:
def __init__(self):
self.win = shared.win
self.total_queue = 0
self.queue = 0
self.games_no = 0
self.blocker = False
self.games = set()
self.sgdb_exception = None
self.progressbar = Gtk.ProgressBar(margin_start=12, margin_end=12)
self.import_statuspage = Adw.StatusPage(
title=_("Importing Games…"),
child=self.progressbar,
)
self.import_dialog = Adw.Window(
content=self.import_statuspage,
modal=True,
default_width=350,
default_height=-1,
transient_for=self.win,
deletable=False,
)
self.import_dialog.present()
def save_game(self, values=None, cover_path=None):
if values:
game = Game(values)
if save_cover:
save_cover(game.game_id, resize_cover(cover_path))
self.games.add(game)
self.games_no += 1
if game.blacklisted:
self.games_no -= 1
self.queue -= 1
self.update_progressbar()
if self.queue == 0 and not self.blocker:
if self.games:
self.total_queue = len(self.games)
self.queue = len(self.games)
self.import_statuspage.set_title(_("Importing Covers…"))
self.update_progressbar()
SGDBSave(self.games, self)
else:
self.done()
def done(self):
self.update_progressbar()
if self.queue == 0:
self.import_dialog.close()
toast = Adw.Toast()
toast.set_priority(Adw.ToastPriority.HIGH)
if self.games_no == 0:
toast.set_title(_("No new games found"))
toast.set_button_label(_("Preferences"))
toast.connect(
"button-clicked", self.response, "open_preferences", "import"
)
elif self.games_no == 1:
toast.set_title(_("1 game imported"))
elif self.games_no > 1:
games_no = self.games_no
toast.set_title(
# The variable is the number of games
_("{} games imported").format(games_no)
)
self.win.toast_overlay.add_toast(toast)
# Add timeout to make it the last thing to happen
GLib.timeout_add(0, self.warning, None, None)
def response(self, _widget, response, page_name=None, expander_row=None):
if response == "open_preferences":
self.win.get_application().on_preferences_action(
None, page_name=page_name, expander_row=expander_row
)
def warning(self, *_args):
if self.sgdb_exception:
create_dialog(
self.win,
_("Couldn't Connect to SteamGridDB"),
self.sgdb_exception,
"open_preferences",
_("Preferences"),
).connect("response", self.response, "sgdb")
self.sgdb_exception = None
def update_progressbar(self):
try:
self.progressbar.set_fraction(1 - (self.queue / self.total_queue))
except ZeroDivisionError:
self.progressbar.set_fraction(1)

View File

@@ -9,20 +9,20 @@ class PickHistory(Sized):
"""Utility class used for rate limiters, counting how many picks
happened in a given period"""
PERIOD: int
period: int
timestamps: list[int] = None
timestamps_lock: Lock = None
def __init__(self, period: int) -> None:
self.PERIOD = period
self.period = period
self.timestamps = []
self.timestamps_lock = Lock()
def remove_old_entries(self):
"""Remove history entries older than the period"""
now = time()
cutoff = now - self.PERIOD
cutoff = now - self.period
with self.timestamps_lock:
self.timestamps = [entry for entry in self.timestamps if entry > cutoff]
@@ -58,15 +58,16 @@ class PickHistory(Sized):
return self.timestamps.copy()
# pylint: disable=too-many-instance-attributes
class RateLimiter(AbstractContextManager):
"""Rate limiter implementing the token bucket algorithm"""
# Period in which we have a max amount of tokens
REFILL_PERIOD_SECONDS: int
refill_period_seconds: int
# Number of tokens allowed in this period
REFILL_PERIOD_TOKENS: int
refill_period_tokens: int
# Max number of tokens that can be consumed instantly
BURST_TOKENS: int
burst_tokens: int
pick_history: PickHistory = None
bucket: BoundedSemaphore = None
@@ -97,13 +98,13 @@ class RateLimiter(AbstractContextManager):
# Initialize default values
if refill_period_seconds is not None:
self.REFILL_PERIOD_SECONDS = refill_period_seconds
self.refill_period_seconds = refill_period_seconds
if refill_period_tokens is not None:
self.REFILL_PERIOD_TOKENS = refill_period_tokens
self.refill_period_tokens = refill_period_tokens
if burst_tokens is not None:
self.BURST_TOKENS = burst_tokens
self.burst_tokens = burst_tokens
if self.pick_history is None:
self.pick_history = PickHistory(self.REFILL_PERIOD_SECONDS)
self.pick_history = PickHistory(self.refill_period_seconds)
# Create synchronization data
self.__n_tokens_lock = Lock()
@@ -111,8 +112,8 @@ class RateLimiter(AbstractContextManager):
self.queue = deque()
# Initialize the token bucket
self.bucket = BoundedSemaphore(self.BURST_TOKENS)
self.n_tokens = self.BURST_TOKENS
self.bucket = BoundedSemaphore(self.burst_tokens)
self.n_tokens = self.burst_tokens
# Spawn daemon thread that refills the bucket
refill_thread = Thread(target=self.refill_thread_func, daemon=True)
@@ -127,8 +128,8 @@ class RateLimiter(AbstractContextManager):
"""
# Compute ideal spacing
tokens_left = self.REFILL_PERIOD_TOKENS - len(self.pick_history)
seconds_left = self.pick_history.start + self.REFILL_PERIOD_SECONDS - time()
tokens_left = self.refill_period_tokens - len(self.pick_history)
seconds_left = self.pick_history.start + self.refill_period_seconds - time()
try:
spacing_seconds = seconds_left / tokens_left
except ZeroDivisionError:
@@ -136,7 +137,7 @@ class RateLimiter(AbstractContextManager):
spacing_seconds = seconds_left
# Prevent spacing dropping down lower than the natural spacing
natural_spacing = self.REFILL_PERIOD_SECONDS / self.REFILL_PERIOD_TOKENS
natural_spacing = self.refill_period_seconds / self.refill_period_tokens
return max(natural_spacing, spacing_seconds)
def refill(self):
@@ -165,7 +166,8 @@ class RateLimiter(AbstractContextManager):
with self.queue_lock:
if len(self.queue) == 0:
return
self.bucket.acquire()
# Not using with because we don't want to release to the bucket
self.bucket.acquire() # pylint: disable=consider-using-with
self.n_tokens -= 1
lock = self.queue.pop()
lock.release()
@@ -173,7 +175,8 @@ class RateLimiter(AbstractContextManager):
def add_to_queue(self) -> Lock:
"""Create a lock, add it to the queue and return it"""
lock = Lock()
lock.acquire()
# We want the lock locked until its turn in queue
lock.acquire() # pylint: disable=consider-using-with
with self.queue_lock:
self.queue.appendleft(lock)
return lock
@@ -182,7 +185,8 @@ class RateLimiter(AbstractContextManager):
"""Acquires a token from the bucket when it's your turn in queue"""
lock = self.add_to_queue()
self.update_queue()
lock.acquire()
# Wait until our turn in queue
lock.acquire() # pylint: disable=consider-using-with
self.pick_history.add()
# --- Support for use in with statements

View File

@@ -47,15 +47,15 @@ class SteamRateLimiter(RateLimiter):
# 200 requests per 5 min seems to be the limit
# https://stackoverflow.com/questions/76047820/how-am-i-exceeding-steam-apis-rate-limit
# https://stackoverflow.com/questions/51795457/avoiding-error-429-too-many-requests-steam-web-api
REFILL_PERIOD_SECONDS = 5 * 60
REFILL_PERIOD_TOKENS = 200
BURST_TOKENS = 100
refill_period_seconds = 5 * 60
refill_period_tokens = 200
burst_tokens = 100
def __init__(self) -> None:
# Load pick history from schema
# (Remember API limits through restarts of Cartridges)
timestamps_str = shared.state_schema.get_string("steam-limiter-tokens-history")
self.pick_history = PickHistory(self.REFILL_PERIOD_SECONDS)
self.pick_history = PickHistory(self.refill_period_seconds)
self.pick_history.add(*json.loads(timestamps_str))
self.pick_history.remove_old_entries()
super().__init__()