Merge branch 'main' into libadwaita-1.4

This commit is contained in:
kramo
2023-08-30 10:21:28 +02:00
parent e67977287d
commit 89bc0877fd
73 changed files with 4569 additions and 3197 deletions

View File

@@ -18,19 +18,22 @@
# SPDX-License-Identifier: GPL-3.0-or-later
import os
import shlex
from pathlib import Path
from time import time
from typing import Any
from typing import Any, Optional
from gi.repository import Adw, Gio, GLib, Gtk
from PIL import Image
from PIL import Image, UnidentifiedImageError
from src import shared
from src.errors.friendly_error import FriendlyError
from src.game import Game
from src.game_cover import GameCover
from src.store.managers.cover_manager import CoverManager
from src.store.managers.sgdb_manager import SGDBManager
from src.utils.create_dialog import create_dialog
from src.utils.save_cover import resize_cover, save_cover
from src.utils.save_cover import convert_cover, save_cover
@Gtk.Template(resource_path=shared.PREFIX + "/gtk/details-window.ui")
@@ -50,12 +53,13 @@ class DetailsWindow(Adw.Window):
exec_info_label = Gtk.Template.Child()
exec_info_popover = Gtk.Template.Child()
file_chooser_button = Gtk.Template.Child()
apply_button = Gtk.Template.Child()
cover_changed: bool = False
def __init__(self, game: Game | None = None, **kwargs: Any):
def __init__(self, game: Optional[Game] = None, **kwargs: Any):
super().__init__(**kwargs)
self.game: Game = game
@@ -83,10 +87,22 @@ class DetailsWindow(Adw.Window):
image_filter.add_suffix(extension[1:])
image_filter.add_suffix("svg") # Gdk.Texture supports .svg but PIL doesn't
file_filters = Gio.ListStore.new(Gtk.FileFilter)
file_filters.append(image_filter)
self.file_dialog = Gtk.FileDialog()
self.file_dialog.set_filters(file_filters)
image_filters = Gio.ListStore.new(Gtk.FileFilter)
image_filters.append(image_filter)
exec_filter = Gtk.FileFilter(name=_("Executables"))
exec_filter.add_mime_type("application/x-executable")
exec_filters = Gio.ListStore.new(Gtk.FileFilter)
exec_filters.append(exec_filter)
self.image_file_dialog = Gtk.FileDialog()
self.image_file_dialog.set_filters(image_filters)
self.image_file_dialog.set_default_filter(image_filter)
self.exec_file_dialog = Gtk.FileDialog()
self.exec_file_dialog.set_filters(exec_filters)
self.exec_file_dialog.set_default_filter(exec_filter)
# Translate this string as you would translate "file"
file_name = _("file.txt")
@@ -114,13 +130,21 @@ class DetailsWindow(Adw.Window):
self.exec_info_label.set_label(exec_info_text)
def clear_info_selection(*_args: Any) -> None:
self.exec_info_label.select_region(-1, -1)
self.exec_info_popover.update_property(
(Gtk.AccessibleProperty.LABEL,),
(
exec_info_text.replace("<tt>", "").replace("</tt>", ""),
), # Remove formatting, else the screen reader reads it
)
self.exec_info_popover.connect("show", clear_info_selection)
def set_exec_info_a11y_label(*_args: Any) -> None:
self.set_focus(self.exec_info_popover)
self.exec_info_popover.connect("show", set_exec_info_a11y_label)
self.cover_button_delete.connect("clicked", self.delete_pixbuf)
self.cover_button_edit.connect("clicked", self.choose_cover)
self.file_chooser_button.connect("clicked", self.choose_executable)
self.apply_button.connect("clicked", self.apply_preferences)
self.name.connect("entry-activated", self.focus_executable)
@@ -175,11 +199,11 @@ class DetailsWindow(Adw.Window):
}
)
if self.win.sidebar.get_selected_row().get_child() not in (
self.win.all_games_row_box,
self.win.added_row_box,
if shared.win.sidebar.get_selected_row().get_child() not in (
shared.win.all_games_row_box,
shared.win.added_row_box,
):
self.win.sidebar.select_row(self.win.added_row_box.get_parent())
shared.win.sidebar.select_row(shared.win.added_row_box.get_parent())
else:
if final_name == "":
@@ -261,19 +285,47 @@ class DetailsWindow(Adw.Window):
def set_cover(self, _source: Any, result: Gio.Task, *_args: Any) -> None:
try:
path = self.file_dialog.open_finish(result).get_path()
path = self.image_file_dialog.open_finish(result).get_path()
except GLib.GError:
return
def resize() -> None:
if cover := resize_cover(path):
self.game_cover.new_cover(cover)
def thread_func() -> None:
new_path = None
try:
with Image.open(path) as image:
if getattr(image, "is_animated", False):
new_path = convert_cover(path)
except UnidentifiedImageError:
pass
if not new_path:
new_path = convert_cover(
pixbuf=shared.store.managers[CoverManager].composite_cover(
Path(path)
)
)
if new_path:
self.game_cover.new_cover(new_path)
self.cover_button_delete_revealer.set_reveal_child(True)
self.cover_changed = True
self.toggle_loading()
self.toggle_loading()
GLib.Thread.new(None, resize)
GLib.Thread.new(None, thread_func)
def set_executable(self, _source: Any, result: Gio.Task, *_args: Any) -> None:
try:
path = self.exec_file_dialog.open_finish(result).get_path()
except GLib.GError:
return
self.executable.set_text(shlex.quote(path))
def choose_executable(self, *_args: Any) -> None:
self.exec_file_dialog.open(self, None, self.set_executable)
def choose_cover(self, *_args: Any) -> None:
self.file_dialog.open(self, None, self.set_cover)
self.image_file_dialog.open(self, None, self.set_cover)

View File

@@ -8,14 +8,14 @@ class ErrorProducer:
Specifies the report_error and collect_errors methods in a thread-safe manner.
"""
errors: list[Exception] = None
errors_lock: Lock = None
errors: list[Exception]
errors_lock: Lock
def __init__(self) -> None:
self.errors = []
self.errors_lock = Lock()
def report_error(self, error: Exception):
def report_error(self, error: Exception) -> None:
"""Report an error"""
with self.errors_lock:
self.errors.append(error)

View File

@@ -1,4 +1,4 @@
from typing import Iterable
from typing import Iterable, Optional
class FriendlyError(Exception):
@@ -27,8 +27,8 @@ class FriendlyError(Exception):
self,
title: str,
subtitle: str,
title_args: Iterable[str] = None,
subtitle_args: Iterable[str] = None,
title_args: Optional[Iterable[str]] = None,
subtitle_args: Optional[Iterable[str]] = None,
) -> None:
"""Create a friendly error

View File

@@ -23,7 +23,7 @@ import shlex
import subprocess
from pathlib import Path
from time import time
from typing import Any
from typing import Any, Optional
from gi.repository import Adw, GObject, Gtk
@@ -57,7 +57,7 @@ class Game(Gtk.Box):
hidden: bool = False
last_played: int = 0
name: str
developer: str | None = None
developer: Optional[str] = None
removed: bool = False
blacklisted: bool = False
game_cover: GameCover = None
@@ -66,8 +66,7 @@ class Game(Gtk.Box):
def __init__(self, data: dict[str, Any], **kwargs: Any) -> None:
super().__init__(**kwargs)
self.win = shared.win
self.app = self.win.get_application()
self.app = shared.win.get_application()
self.version = shared.SPEC_VERSION
self.update_values(data)
@@ -97,22 +96,22 @@ class Game(Gtk.Box):
def save(self) -> None:
self.emit("save-ready", {})
def create_toast(self, title: str, action: str | None = None) -> None:
def create_toast(self, title: str, action: Optional[str] = None) -> None:
toast = Adw.Toast.new(title.format(self.name))
toast.set_priority(Adw.ToastPriority.HIGH)
toast.set_use_markup(False)
if action:
toast.set_button_label(_("Undo"))
toast.connect("button-clicked", self.win.on_undo_action, self, action)
toast.connect("button-clicked", shared.win.on_undo_action, self, action)
if (self, action) in self.win.toasts.keys():
if (self, action) in shared.win.toasts.keys():
# Dismiss the toast if there already is one
self.win.toasts[(self, action)].dismiss()
shared.win.toasts[(self, action)].dismiss()
self.win.toasts[(self, action)] = toast
shared.win.toasts[(self, action)] = toast
self.win.toast_overlay.add_toast(toast)
shared.win.toast_overlay.add_toast(toast)
def launch(self) -> None:
self.last_played = int(time())
@@ -129,7 +128,7 @@ class Game(Gtk.Box):
# pylint: disable=consider-using-with
subprocess.Popen(
args,
cwd=Path.home(),
cwd=shared.home,
shell=True,
start_new_session=True,
creationflags=subprocess.CREATE_NEW_PROCESS_GROUP if os.name == "nt" else 0, # type: ignore
@@ -145,8 +144,8 @@ class Game(Gtk.Box):
self.hidden = not self.hidden
self.save()
if self.win.navigation_view.get_visible_page() == self.win.details_page:
self.win.navigation_view.pop()
if shared.win.navigation_view.get_visible_page() == shared.win.details_page:
shared.win.navigation_view.pop()
self.update()
@@ -163,8 +162,8 @@ class Game(Gtk.Box):
self.save()
self.update()
if self.win.navigation_view.get_visible_page() == self.win.details_page:
self.win.navigation_view.pop()
if shared.win.navigation_view.get_visible_page() == shared.win.details_page:
shared.win.navigation_view.pop()
# The variable is the title of the game
self.create_toast(_("{} removed").format(self.name), "remove")
@@ -176,7 +175,7 @@ class Game(Gtk.Box):
self.cover.set_opacity(int(not loading))
self.spinner.set_spinning(loading)
def get_cover_path(self) -> Path | None:
def get_cover_path(self) -> Optional[Path]:
cover_path = shared.covers_dir / f"{self.game_id}.gif"
if cover_path.is_file():
return cover_path # type: ignore
@@ -198,7 +197,7 @@ class Game(Gtk.Box):
if shared.schema.get_boolean("cover-launches-game") ^ button:
self.launch()
else:
self.win.show_details_page(self)
shared.win.show_details_page(self)
def set_play_icon(self) -> None:
self.play_button.set_icon_name(

View File

@@ -18,7 +18,7 @@
# SPDX-License-Identifier: GPL-3.0-or-later
from pathlib import Path
from typing import Any, Callable
from typing import Optional
from gi.repository import Gdk, GdkPixbuf, Gio, GLib, Gtk
from PIL import Image, ImageFilter, ImageStat
@@ -27,12 +27,12 @@ from src import shared
class GameCover:
texture: Gdk.Texture | None
blurred: Gdk.Texture | None
luminance: tuple[float, float] | None
path: Path | None
animation: GdkPixbuf.PixbufAnimation | None
anim_iter: GdkPixbuf.PixbufAnimationIter | None
texture: Optional[Gdk.Texture]
blurred: Optional[Gdk.Texture]
luminance: Optional[tuple[float, float]]
path: Optional[Path]
animation: Optional[GdkPixbuf.PixbufAnimation]
anim_iter: Optional[GdkPixbuf.PixbufAnimationIter]
placeholder = Gdk.Texture.new_from_resource(
shared.PREFIX + "/library_placeholder.svg"
@@ -41,21 +41,11 @@ class GameCover:
shared.PREFIX + "/library_placeholder_small.svg"
)
def __init__(self, pictures: set[Gtk.Picture], path: Path | None = None) -> None:
def __init__(self, pictures: set[Gtk.Picture], path: Optional[Path] = None) -> None:
self.pictures = pictures
self.new_cover(path)
# Wrap the function in another one as Gio.Task.run_in_thread does not allow for passing args
def create_func(self, path: Path | None) -> Callable:
self.animation = GdkPixbuf.PixbufAnimation.new_from_file(str(path))
self.anim_iter = self.animation.get_iter()
def wrapper(task: Gio.Task, *_args: Any) -> None:
self.update_animation((task, self.animation))
return wrapper
def new_cover(self, path: Path | None = None) -> None:
def new_cover(self, path: Optional[Path] = None) -> None:
self.animation = None
self.texture = None
self.blurred = None
@@ -64,8 +54,12 @@ class GameCover:
if path:
if path.suffix == ".gif":
self.animation = GdkPixbuf.PixbufAnimation.new_from_file(str(path))
self.anim_iter = self.animation.get_iter()
self.task = Gio.Task.new()
self.task.run_in_thread(self.create_func(self.path))
self.task.run_in_thread(
lambda *_: self.update_animation((self.task, self.animation))
)
else:
self.texture = Gdk.Texture.new_from_filename(str(path))

View File

@@ -19,8 +19,10 @@
# SPDX-License-Identifier: GPL-3.0-or-later
import logging
from time import time
from typing import Any, Optional
from gi.repository import Adw, GLib, Gtk
from gi.repository import Adw, Gio, GLib, Gtk
from src import shared
from src.errors.error_producer import ErrorProducer
@@ -30,56 +32,60 @@ from src.importer.sources.location import UnresolvableLocationError
from src.importer.sources.source import Source
from src.store.managers.async_manager import AsyncManager
from src.store.pipeline import Pipeline
from src.utils.task import Task
# pylint: disable=too-many-instance-attributes
class Importer(ErrorProducer):
"""A class in charge of scanning sources for games"""
progressbar = None
import_statuspage = None
import_dialog = None
summary_toast = None
progressbar: Gtk.ProgressBar
import_statuspage: Adw.StatusPage
import_dialog: Adw.MessageDialog
summary_toast: Optional[Adw.Toast] = None
sources: set[Source] = None
sources: set[Source]
n_source_tasks_created: int = 0
n_source_tasks_done: int = 0
n_pipelines_done: int = 0
game_pipelines: set[Pipeline] = None
game_pipelines: set[Pipeline]
removed_game_ids: set[str] = set()
imported_game_ids: set[str] = set()
removed_game_ids: set[str]
imported_game_ids: set[str]
def __init__(self):
def __init__(self) -> None:
super().__init__()
shared.import_time = int(time())
# TODO: make this stateful
shared.store.new_game_ids = set()
shared.store.duplicate_game_ids = set()
self.removed_game_ids = set()
self.imported_game_ids = set()
self.game_pipelines = set()
self.sources = set()
@property
def n_games_added(self):
def n_games_added(self) -> int:
return sum(
1 if not (pipeline.game.blacklisted or pipeline.game.removed) else 0
for pipeline in self.game_pipelines
)
@property
def pipelines_progress(self):
def pipelines_progress(self) -> float:
progress = sum(pipeline.progress for pipeline in self.game_pipelines)
try:
progress = progress / len(self.game_pipelines)
except ZeroDivisionError:
progress = 0
return progress
return progress # type: ignore
@property
def sources_progress(self):
def sources_progress(self) -> float:
try:
progress = self.n_source_tasks_done / self.n_source_tasks_created
except ZeroDivisionError:
@@ -87,19 +93,23 @@ class Importer(ErrorProducer):
return progress
@property
def finished(self):
def finished(self) -> bool:
return (
self.n_source_tasks_created == self.n_source_tasks_done
and len(self.game_pipelines) == self.n_pipelines_done
)
def add_source(self, source):
def add_source(self, source: Source) -> None:
self.sources.add(source)
def run(self):
def run(self) -> None:
"""Use several Gio.Task to import games from added sources"""
shared.win.get_application().state = shared.AppState.IMPORT
if self.__class__.summary_toast:
self.__class__.summary_toast.dismiss()
shared.win.get_application().lookup_action("import").set_enabled(False)
self.create_dialog()
@@ -115,14 +125,17 @@ class Importer(ErrorProducer):
for source in self.sources:
logging.debug("Importing games from source %s", source.source_id)
task = Task.new(None, None, self.source_callback, (source,))
task = Gio.Task.new(None, None, self.source_callback, (source,))
self.n_source_tasks_created += 1
task.set_task_data((source,))
task.run_in_thread(self.source_task_thread_func)
task.run_in_thread(
lambda _task, _obj, _data, _cancellable, src=source: self.source_task_thread_func(
(src,)
)
)
self.progress_changed_callback()
def create_dialog(self):
def create_dialog(self) -> None:
"""Create the import dialog"""
self.progressbar = Gtk.ProgressBar(margin_start=12, margin_end=12)
self.import_statuspage = Adw.StatusPage(
@@ -139,7 +152,7 @@ class Importer(ErrorProducer):
)
self.import_dialog.present()
def source_task_thread_func(self, _task, _obj, data, _cancellable):
def source_task_thread_func(self, data: tuple) -> None:
"""Source import task code"""
source: Source
@@ -193,27 +206,27 @@ class Importer(ErrorProducer):
pipeline.connect("advanced", self.pipeline_advanced_callback)
self.game_pipelines.add(pipeline)
def update_progressbar(self):
def update_progressbar(self) -> None:
"""Update the progressbar to show the overall import progress"""
# Reserve 10% for the sources discovery, the rest is the pipelines
self.progressbar.set_fraction(
(0.1 * self.sources_progress) + (0.9 * self.pipelines_progress)
)
def source_callback(self, _obj, _result, data):
def source_callback(self, _obj: Any, _result: Any, data: tuple) -> None:
"""Callback executed when a source is fully scanned"""
source, *_rest = data
logging.debug("Import done for source %s", source.source_id)
self.n_source_tasks_done += 1
self.progress_changed_callback()
def pipeline_advanced_callback(self, pipeline: Pipeline):
def pipeline_advanced_callback(self, pipeline: Pipeline) -> None:
"""Callback called when a pipeline for a game has advanced"""
if pipeline.is_done:
self.n_pipelines_done += 1
self.progress_changed_callback()
def progress_changed_callback(self):
def progress_changed_callback(self) -> None:
"""
Callback called when the import process has progressed
@@ -226,7 +239,7 @@ class Importer(ErrorProducer):
if self.finished:
self.import_callback()
def remove_games(self):
def remove_games(self) -> None:
"""Set removed to True for missing games"""
if not shared.schema.get_boolean("remove-missing"):
return
@@ -250,7 +263,7 @@ class Importer(ErrorProducer):
game.update()
self.removed_game_ids.add(game.game_id)
def import_callback(self):
def import_callback(self) -> None:
"""Callback called when importing has finished"""
logging.info("Import done")
self.remove_games()
@@ -258,17 +271,17 @@ class Importer(ErrorProducer):
shared.store.new_game_ids = set()
shared.store.duplicate_game_ids = set()
self.import_dialog.close()
self.summary_toast = self.create_summary_toast()
self.__class__.summary_toast = self.create_summary_toast()
self.create_error_dialog()
shared.win.get_application().lookup_action("import").set_enabled(True)
shared.win.get_application().state = shared.AppState.DEFAULT
shared.win.create_source_rows()
def create_error_dialog(self):
def create_error_dialog(self) -> None:
"""Dialog containing all errors raised by importers"""
# Collect all errors that happened in the importer and the managers
errors: list[Exception] = []
errors = []
errors.extend(self.collect_errors())
for manager in shared.store.managers.values():
errors.extend(manager.collect_errors())
@@ -316,7 +329,7 @@ class Importer(ErrorProducer):
dialog.present()
def undo_import(self, *_args):
def undo_import(self, *_args: Any) -> None:
for game_id in self.imported_game_ids:
shared.store[game_id].removed = True
shared.store[game_id].update()
@@ -329,11 +342,12 @@ class Importer(ErrorProducer):
self.imported_game_ids = set()
self.removed_game_ids = set()
self.summary_toast.dismiss()
if self.__class__.summary_toast:
self.__class__.summary_toast.dismiss()
logging.info("Import undone")
def create_summary_toast(self):
def create_summary_toast(self) -> Adw.Toast:
"""N games imported, removed toast"""
toast = Adw.Toast(timeout=0, priority=Adw.ToastPriority.HIGH)
@@ -374,16 +388,21 @@ class Importer(ErrorProducer):
shared.win.toast_overlay.add_toast(toast)
return toast
def open_preferences(self, page=None, expander_row=None):
def open_preferences(
self,
page_name: Optional[str] = None,
expander_row: Optional[Adw.ExpanderRow] = None,
) -> Adw.PreferencesWindow:
return shared.win.get_application().on_preferences_action(
page_name=page, expander_row=expander_row
page_name=page_name, expander_row=expander_row
)
def timeout_toast(self, *_args):
def timeout_toast(self, *_args: Any) -> None:
"""Manually timeout the toast after the user has dismissed all warnings"""
GLib.timeout_add_seconds(5, self.summary_toast.dismiss)
if self.__class__.summary_toast:
GLib.timeout_add_seconds(5, self.__class__.summary_toast.dismiss)
def dialog_response_callback(self, _widget, response, *args):
def dialog_response_callback(self, _widget: Any, response: str, *args: Any) -> None:
"""Handle after-import dialogs callback"""
logging.debug("After-import dialog response: %s (%s)", response, str(args))
if response == "open_preferences":

View File

@@ -19,7 +19,6 @@
# SPDX-License-Identifier: GPL-3.0-or-later
from pathlib import Path
from time import time
from typing import NamedTuple
import yaml
@@ -38,17 +37,17 @@ class BottlesSourceIterable(SourceIterable):
data = self.source.locations.data["library.yml"].read_text("utf-8")
library: dict = yaml.safe_load(data)
added_time = int(time())
for entry in library.values():
# Build game
values = {
"source": self.source.source_id,
"added": added_time,
"added": shared.import_time,
"name": entry["name"],
"game_id": self.source.game_id_format.format(game_id=entry["id"]),
"executable": self.source.executable_format.format(
bottle_name=entry["bottle"]["name"], game_name=entry["name"]
"executable": self.source.make_executable(
bottle_name=entry["bottle"]["name"],
game_name=entry["name"],
),
}
game = Game(values)
@@ -73,7 +72,6 @@ class BottlesSourceIterable(SourceIterable):
image_path = bottles_location / bottle_path / "grids" / image_name
additional_data = {"local_image_path": image_path}
# Produce game
yield (game, additional_data)

View File

@@ -0,0 +1,204 @@
# desktop_source.py
#
# Copyright 2023 kramo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# SPDX-License-Identifier: GPL-3.0-or-later
import os
import shlex
import subprocess
from pathlib import Path
from typing import NamedTuple
from gi.repository import GLib, Gtk
from src import shared
from src.game import Game
from src.importer.sources.source import Source, SourceIterable
class DesktopSourceIterable(SourceIterable):
source: "DesktopSource"
def __iter__(self):
"""Generator method producing games"""
icon_theme = Gtk.IconTheme.new()
search_paths = [
shared.home / ".local" / "share",
"/run/host/usr/local/share",
"/run/host/usr/share",
"/run/host/usr/share/pixmaps",
"/usr/share/pixmaps",
] + GLib.get_system_data_dirs()
for search_path in search_paths:
path = Path(search_path)
if not str(search_path).endswith("/pixmaps"):
path = path / "icons"
if not path.is_dir():
continue
if str(path).startswith("/app/"):
continue
icon_theme.add_search_path(str(path))
launch_command, full_path = self.check_launch_command()
for path in search_paths:
if str(path).startswith("/app/"):
continue
path = Path(path) / "applications"
if not path.is_dir():
continue
for entry in path.iterdir():
if entry.suffix != ".desktop":
continue
# Skip Lutris games
if str(entry.name).startswith("net.lutris."):
continue
keyfile = GLib.KeyFile.new()
try:
keyfile.load_from_file(str(entry), 0)
if "Game" not in keyfile.get_string_list(
"Desktop Entry", "Categories"
):
continue
name = keyfile.get_string("Desktop Entry", "Name")
executable = keyfile.get_string("Desktop Entry", "Exec").split(
" %"
)[0]
except GLib.GError:
continue
# Skip Steam games
if "steam://rungameid/" in executable:
continue
# Skip Heroic games
if "heroic://launch/" in executable:
continue
# Skip Bottles games
if "bottles-cli " in executable:
continue
try:
if keyfile.get_boolean("Desktop Entry", "NoDisplay"):
continue
except GLib.GError:
pass
# Strip /run/host from Flatpak paths
if entry.is_relative_to(prefix := "/run/host"):
entry = Path("/") / entry.relative_to(prefix)
launch_arg = shlex.quote(str(entry if full_path else entry.stem))
values = {
"source": self.source.source_id,
"added": shared.import_time,
"name": name,
"game_id": f"desktop_{entry.stem}",
"executable": f"{launch_command} {launch_arg}",
}
game = Game(values)
additional_data = {}
try:
icon_str = keyfile.get_string("Desktop Entry", "Icon")
except GLib.GError:
yield game
continue
else:
if "/" in icon_str:
additional_data = {"local_icon_path": Path(icon_str)}
yield (game, additional_data)
continue
try:
if (
icon_path := icon_theme.lookup_icon(
icon_str,
None,
512,
1,
shared.win.get_direction(),
0,
)
.get_file()
.get_path()
):
additional_data = {"local_icon_path": Path(icon_path)}
except GLib.GError:
pass
yield (game, additional_data)
def check_launch_command(self) -> (str, bool):
"""Check whether `gio launch` `gtk4-launch` or `gtk-launch` are available on the system"""
commands = (("gio launch", True), ("gtk4-launch", False), ("gtk-launch", False))
flatpak_str = "flatpak-spawn --host /bin/sh -c "
for command, full_path in commands:
# Even if `gio` is available, `gio launch` is only available on GLib >= 2.67.2
check_command = (
"gio help launch"
if command == "gio launch"
else f"type {command} &> /dev/null"
)
if os.getenv("FLATPAK_ID") == shared.APP_ID:
check_command = flatpak_str + shlex.quote(check_command)
try:
subprocess.run(check_command, shell=True, check=True)
return command, full_path
except subprocess.CalledProcessError:
pass
return commands[2]
class DesktopLocations(NamedTuple):
pass
class DesktopSource(Source):
"""Generic Flatpak source"""
source_id = "desktop"
name = _("Desktop")
iterable_class = DesktopSourceIterable
available_on = {"linux"}
locations: DesktopLocations
def __init__(self) -> None:
super().__init__()
self.locations = DesktopLocations()

View File

@@ -18,7 +18,6 @@
# SPDX-License-Identifier: GPL-3.0-or-later
from pathlib import Path
from time import time
from typing import NamedTuple
from gi.repository import GLib, Gtk
@@ -26,7 +25,7 @@ from gi.repository import GLib, Gtk
from src import shared
from src.game import Game
from src.importer.sources.location import Location, LocationSubPath
from src.importer.sources.source import Source, SourceIterable
from src.importer.sources.source import ExecutableFormatSource, SourceIterable
class FlatpakSourceIterable(SourceIterable):
@@ -35,8 +34,6 @@ class FlatpakSourceIterable(SourceIterable):
def __iter__(self):
"""Generator method producing games"""
added_time = int(time())
icon_theme = Gtk.IconTheme.new()
icon_theme.add_search_path(str(self.source.locations.data["icons"]))
@@ -51,6 +48,7 @@ class FlatpakSourceIterable(SourceIterable):
"com.heroicgameslauncher.hgl",
"com.usebottles.Bottles",
"io.itch.itch",
"org.libretro.RetroArch",
}
)
@@ -78,12 +76,10 @@ class FlatpakSourceIterable(SourceIterable):
values = {
"source": self.source.source_id,
"added": added_time,
"added": shared.import_time,
"name": name,
"game_id": self.source.game_id_format.format(game_id=flatpak_id),
"executable": self.source.executable_format.format(
flatpak_id=flatpak_id
),
"executable": self.source.make_executable(flatpak_id=flatpak_id),
}
game = Game(values)
@@ -108,7 +104,6 @@ class FlatpakSourceIterable(SourceIterable):
except GLib.GError:
pass
# Produce game
yield (game, additional_data)
@@ -116,7 +111,7 @@ class FlatpakLocations(NamedTuple):
data: Location
class FlatpakSource(Source):
class FlatpakSource(ExecutableFormatSource):
"""Generic Flatpak source"""
source_id = "flatpak"

View File

@@ -25,7 +25,6 @@ from functools import cached_property
from hashlib import sha256
from json import JSONDecodeError
from pathlib import Path
from time import time
from typing import Iterable, NamedTuple, Optional, TypedDict
from src import shared
@@ -91,9 +90,7 @@ class SubSourceIterable(Iterable):
logging.debug("Using Heroic %s library.json path %s", self.name, path)
return path
def process_library_entry(
self, entry: HeroicLibraryEntry, added_time: int
) -> SourceIterationResult:
def process_library_entry(self, entry: HeroicLibraryEntry) -> SourceIterationResult:
"""Build a Game from a Heroic library entry"""
app_name = entry["app_name"]
@@ -102,21 +99,19 @@ class SubSourceIterable(Iterable):
# Build game
values = {
"source": f"{self.source.source_id}_{self.service}",
"added": added_time,
"added": shared.import_time,
"name": entry["title"],
"developer": entry.get("developer", None),
"game_id": self.source.game_id_format.format(
service=self.service, game_id=app_name
),
"executable": self.source.executable_format.format(
runner=runner, app_name=app_name
),
"executable": self.source.make_executable(runner=runner, app_name=app_name),
"hidden": self.source_iterable.is_hidden(app_name),
}
game = Game(values)
# Get the image path from the heroic cache
# Filenames are derived from the URL that heroic used to get the file
# Get the image path from the Heroic cache
# Filenames are derived from the URL that Heroic used to get the file
uri: str = entry["art_square"] + self.image_uri_params
digest = sha256(uri.encode()).hexdigest()
image_path = self.source.locations.config.root / "images-cache" / digest
@@ -129,7 +124,7 @@ class SubSourceIterable(Iterable):
Iterate through the games with a generator
:raises InvalidLibraryFileError: on initial call if the library file is bad
"""
added_time = int(time())
try:
iterator = iter(
path_json_load(self.library_path)[self.library_json_entries_key]
@@ -140,7 +135,7 @@ class SubSourceIterable(Iterable):
) from error
for entry in iterator:
try:
yield self.process_library_entry(entry, added_time)
yield self.process_library_entry(entry)
except KeyError as error:
logging.warning(
"Skipped invalid %s game %s",
@@ -178,7 +173,7 @@ class StoreSubSourceIterable(SubSourceIterable):
def is_installed(self, app_name: str) -> bool:
return app_name in self.installed_app_names
def process_library_entry(self, entry, added_time):
def process_library_entry(self, entry):
# Skip games that are not installed
app_name = entry["app_name"]
if not self.is_installed(app_name):
@@ -190,7 +185,7 @@ class StoreSubSourceIterable(SubSourceIterable):
)
return None
# Process entry as normal
return super().process_library_entry(entry, added_time)
return super().process_library_entry(entry)
def __iter__(self):
"""
@@ -221,13 +216,10 @@ class LegendaryIterable(StoreSubSourceIterable):
@cached_property
def installed_path(self) -> Path:
"""
Get the right path depending on the Heroic version
TODO after heroic 2.9 has been out for a while
We should use the commented out relative_installed_path
and remove this property override.
"""
"""Get the right path depending on the Heroic version"""
# TODO after Heroic 2.9 has been out for a while
# We should use the commented out relative_installed_path
# and remove this property override.
heroic_config_path = self.source.locations.config.root
# Heroic >= 2.9
@@ -241,7 +233,7 @@ class LegendaryIterable(StoreSubSourceIterable):
else:
# Heroic native
logging.debug("Using Heroic native <= 2.8 legendary file")
path = Path.home() / ".config"
path = shared.home / ".config"
path = path / "legendary" / "installed.json"
logging.debug("Using Heroic %s installed.json path %s", self.name, path)

View File

@@ -20,7 +20,6 @@
from shutil import rmtree
from sqlite3 import connect
from time import time
from typing import NamedTuple
from src import shared
@@ -56,16 +55,14 @@ class ItchSourceIterable(SourceIterable):
connection = connect(db_path)
cursor = connection.execute(db_request)
added_time = int(time())
# Create games from the db results
for row in cursor:
values = {
"added": added_time,
"added": shared.import_time,
"source": self.source.source_id,
"name": row[1],
"game_id": self.source.game_id_format.format(game_id=row[0]),
"executable": self.source.executable_format.format(cave_id=row[4]),
"executable": self.source.make_executable(cave_id=row[4]),
}
additional_data = {"online_cover_url": row[3] or row[2]}
game = Game(values)

View File

@@ -20,21 +20,22 @@
import json
import logging
from json import JSONDecodeError
from time import time
from typing import NamedTuple
from src import shared
from src.game import Game
from src.importer.sources.location import Location, LocationSubPath
from src.importer.sources.source import Source, SourceIterable, SourceIterationResult
from src.importer.sources.source import (
ExecutableFormatSource,
SourceIterable,
SourceIterationResult,
)
class LegendarySourceIterable(SourceIterable):
source: "LegendarySource"
def game_from_library_entry(
self, entry: dict, added_time: int
) -> SourceIterationResult:
def game_from_library_entry(self, entry: dict) -> SourceIterationResult:
# Skip non-games
if entry["is_dlc"]:
return None
@@ -42,11 +43,11 @@ class LegendarySourceIterable(SourceIterable):
# Build game
app_name = entry["app_name"]
values = {
"added": added_time,
"added": shared.import_time,
"source": self.source.source_id,
"name": entry["title"],
"game_id": self.source.game_id_format.format(game_id=app_name),
"executable": self.source.executable_format.format(app_name=app_name),
"executable": self.source.make_executable(app_name=app_name),
}
data = {}
@@ -74,12 +75,10 @@ class LegendarySourceIterable(SourceIterable):
logging.warning("Couldn't open Legendary file: %s", str(file))
return
added_time = int(time())
# Generate games from library
for entry in library.values():
try:
result = self.game_from_library_entry(entry, added_time)
result = self.game_from_library_entry(entry)
except KeyError as error:
# Skip invalid games
logging.warning(
@@ -93,7 +92,7 @@ class LegendaryLocations(NamedTuple):
config: Location
class LegendarySource(Source):
class LegendarySource(ExecutableFormatSource):
source_id = "legendary"
name = _("Legendary")
executable_format = "legendary launch {app_name}"

View File

@@ -1,7 +1,7 @@
import logging
from os import PathLike
from pathlib import Path
from typing import Iterable, Mapping, NamedTuple
from typing import Iterable, Mapping, NamedTuple, Optional
from src import shared
@@ -41,7 +41,7 @@ class Location:
paths: Mapping[str, LocationSubPath]
invalid_subtitle: str
root: Path = None
root: Optional[Path] = None
def __init__(
self,
@@ -94,7 +94,9 @@ class Location:
shared.schema.set_string(self.schema_key, value)
logging.debug("Resolved value for schema key %s: %s", self.schema_key, value)
def __getitem__(self, key: str):
def __getitem__(self, key: str) -> Optional[Path]:
"""Get the computed path from its key for the location"""
self.resolve()
return self.root / self.paths[key].segment
if self.root:
return self.root / self.paths[key].segment
return None

View File

@@ -19,7 +19,6 @@
# SPDX-License-Identifier: GPL-3.0-or-later
from shutil import rmtree
from sqlite3 import connect
from time import time
from typing import NamedTuple
from src import shared
@@ -56,20 +55,18 @@ class LutrisSourceIterable(SourceIterable):
connection = connect(db_path)
cursor = connection.execute(request, params)
added_time = int(time())
# Create games from the DB results
for row in cursor:
# Create game
values = {
"added": added_time,
"added": shared.import_time,
"hidden": row[4],
"name": row[1],
"source": f"{self.source.source_id}_{row[3]}",
"game_id": self.source.game_id_format.format(
runner=row[3], game_id=row[0]
),
"executable": self.source.executable_format.format(game_id=row[0]),
"executable": self.source.make_executable(game_id=row[0]),
}
game = Game(values)
@@ -77,7 +74,6 @@ class LutrisSourceIterable(SourceIterable):
image_path = self.source.locations.cache["coverart"] / f"{row[2]}.jpg"
additional_data = {"local_image_path": image_path}
# Produce game
yield (game, additional_data)
# Cleanup

View File

@@ -23,7 +23,7 @@ import re
from hashlib import md5
from json import JSONDecodeError
from pathlib import Path
from time import time
from shlex import quote as shell_quote
from typing import NamedTuple
from src import shared
@@ -52,7 +52,6 @@ class RetroarchSourceIterable(SourceIterable):
raise KeyError(f"Key not found in RetroArch config: {key}")
def __iter__(self):
added_time = int(time())
bad_playlists = set()
config_file = self.source.locations.config["retroarch.cfg"]
@@ -100,12 +99,12 @@ class RetroarchSourceIterable(SourceIterable):
values = {
"source": self.source.source_id,
"added": added_time,
"added": shared.import_time,
"name": item["label"],
"game_id": self.source.game_id_format.format(game_id=game_id),
"executable": self.source.executable_format.format(
rom_path=item["path"],
"executable": self.source.make_executable(
core_path=core_path,
rom_path=item["path"],
),
}
@@ -147,44 +146,6 @@ class RetroarchSource(Source):
locations: RetroarchLocations
@property
def executable_format(self):
self.locations.config.resolve()
is_flatpak = self.locations.config.root.is_relative_to(shared.flatpak_dir)
base = "flatpak run org.libretro.RetroArch" if is_flatpak else "retroarch"
args = '-L "{core_path}" "{rom_path}"'
return f"{base} {args}"
def get_steam_location(self) -> str:
"""
Get the RetroArch installed via Steam location
:raise UnresolvableLocationError: if steam isn't installed
:raise KeyError: if there is no libraryfolders.vdf subpath
:raise OSError: if libraryfolders.vdf can't be opened
:raise ValueError: if RetroArch isn't installed through Steam
"""
# Find steam location
libraryfolders = SteamSource().locations.data["libraryfolders.vdf"]
parse_apps = False
with open(libraryfolders, "r", encoding="utf-8") as open_file:
# Search each line for a library path and store it each time a new one is found.
for line in open_file:
if '"path"' in line:
library_path = re.findall(
'"path"\\s+"(.*)"\n', line, re.IGNORECASE
)[0]
elif '"apps"' in line:
parse_apps = True
elif parse_apps and "}" in line:
parse_apps = False
# Stop searching, as the library path directly above the appid has been found.
elif parse_apps and '"1118310"' in line:
return Path(f"{library_path}/steamapps/common/RetroArch")
# Not found
raise ValueError("RetroArch not found in Steam library")
def __init__(self) -> None:
super().__init__()
self.locations = RetroarchLocations(
@@ -212,9 +173,83 @@ class RetroarchSource(Source):
invalid_subtitle=Location.CONFIG_INVALID_SUBTITLE,
)
)
# TODO enable when we get the Steam RetroArch games working
# self.add_steam_location_candidate()
def add_steam_location_candidate(self) -> None:
"""Add the Steam RetroAcrh location to the config candidates"""
try:
self.locations.config.candidates.append(self.get_steam_location())
except (OSError, KeyError, UnresolvableLocationError):
logging.debug("Steam isn't installed")
except ValueError as error:
logging.debug("RetroArch Steam location candiate not found", exc_info=error)
def get_steam_location(self) -> str:
"""
Get the RetroArch installed via Steam location
:raise UnresolvableLocationError: if Steam isn't installed
:raise KeyError: if there is no libraryfolders.vdf subpath
:raise OSError: if libraryfolders.vdf can't be opened
:raise ValueError: if RetroArch isn't installed through Steam
"""
# Find Steam location
libraryfolders = SteamSource().locations.data["libraryfolders.vdf"]
parse_apps = False
with open(libraryfolders, "r", encoding="utf-8") as open_file:
# Search each line for a library path and store it each time a new one is found.
for line in open_file:
if '"path"' in line:
library_path = re.findall(
'"path"\\s+"(.*)"\n', line, re.IGNORECASE
)[0]
elif '"apps"' in line:
parse_apps = True
elif parse_apps and "}" in line:
parse_apps = False
# Stop searching, as the library path directly above the appid has been found.
elif parse_apps and '"1118310"' in line:
return Path(f"{library_path}/steamapps/common/RetroArch")
# Not found
raise ValueError("RetroArch not found in Steam library")
def make_executable(self, core_path: Path, rom_path: Path) -> str:
"""
Generate an executable command from the rom path and core path,
depending on the source's location.
The format depends on RetroArch's installation method,
detected from the source config location
:param Path rom_path: the game's rom path
:param Path core_path: the game's core path
:return str: an executable command
"""
self.locations.config.resolve()
args = ("-L", core_path, rom_path)
# Steam RetroArch
# (Must check before Flatpak, because Steam itself can be installed as one)
# TODO enable when we get Steam RetroArch executable to work
# if self.locations.config.root.parent.parent.name == "steamapps":
# # steam://run exepects args to be url-encoded and separated by spaces.
# args = map(lambda s: url_quote(str(s), safe=""), args)
# args_str = " ".join(args)
# uri = f"steam://run/1118310//{args_str}/"
# return f"xdg-open {shell_quote(uri)}"
# Flatpak RetroArch
args = map(lambda s: shell_quote(str(s)), args)
args_str = " ".join(args)
if self.locations.config.root.is_relative_to(shared.flatpak_dir):
return f"flatpak run org.libretro.RetroArch {args_str}"
# TODO executable override for non-sandboxed sources
# Linux native RetroArch
return f"retroarch {args_str}"
# TODO implement for windows (needs override)

View File

@@ -20,19 +20,19 @@
import sys
from abc import abstractmethod
from collections.abc import Iterable
from typing import Any, Collection, Generator
from typing import Any, Collection, Generator, Optional
from src.game import Game
from src.importer.sources.location import Location
# Type of the data returned by iterating on a Source
SourceIterationResult = None | Game | tuple[Game, tuple[Any]]
SourceIterationResult = Optional[Game | tuple[Game, tuple[Any]]]
class SourceIterable(Iterable):
"""Data producer for a source of games"""
source: "Source" = None
source: "Source"
def __init__(self, source: "Source") -> None:
self.source = source
@@ -53,7 +53,7 @@ class Source(Iterable):
source_id: str
name: str
variant: str = None
variant: Optional[str] = None
available_on: set[str] = set()
iterable_class: type[SourceIterable]
@@ -65,7 +65,7 @@ class Source(Iterable):
def full_name(self) -> str:
"""The source's full name"""
full_name_ = self.name
if self.variant is not None:
if self.variant:
full_name_ += f" ({self.variant})"
return full_name_
@@ -75,13 +75,14 @@ class Source(Iterable):
return self.source_id + "_{game_id}"
@property
def is_available(self):
def is_available(self) -> bool:
return sys.platform in self.available_on
@property
@abstractmethod
def executable_format(self) -> str:
"""The executable format used to construct game executables"""
def make_executable(self, *args, **kwargs) -> str:
"""
Create a game executable command.
Should be implemented by child classes.
"""
def __iter__(self) -> Generator[SourceIterationResult, None, None]:
"""
@@ -93,8 +94,21 @@ class Source(Iterable):
return iter(self.iterable_class(self))
class ExecutableFormatSource(Source):
"""Source class that uses a simple executable format to start games"""
@property
@abstractmethod
def executable_format(self) -> str:
"""The executable format used to construct game executables"""
def make_executable(self, *args, **kwargs) -> str:
"""Use the executable format to"""
return self.executable_format.format(*args, **kwargs)
# pylint: disable=abstract-method
class URLExecutableSource(Source):
class URLExecutableSource(ExecutableFormatSource):
"""Source class that use custom URLs to start games"""
url_format: str

View File

@@ -21,7 +21,6 @@
import logging
import re
from pathlib import Path
from time import time
from typing import Iterable, NamedTuple
from src import shared
@@ -35,7 +34,7 @@ class SteamSourceIterable(SourceIterable):
source: "SteamSource"
def get_manifest_dirs(self) -> Iterable[Path]:
"""Get dirs that contain steam app manifests"""
"""Get dirs that contain Steam app manifests"""
libraryfolders_path = self.source.locations.data["libraryfolders.vdf"]
with open(libraryfolders_path, "r", encoding="utf-8") as file:
contents = file.read()
@@ -64,8 +63,6 @@ class SteamSourceIterable(SourceIterable):
appid_cache = set()
manifests = self.get_manifests()
added_time = int(time())
for manifest in manifests:
# Get metadata from manifest
steam = SteamFileHelper()
@@ -90,11 +87,11 @@ class SteamSourceIterable(SourceIterable):
# Build game from local data
values = {
"added": added_time,
"added": shared.import_time,
"name": local_data["name"],
"source": self.source.source_id,
"game_id": self.source.game_id_format.format(game_id=appid),
"executable": self.source.executable_format.format(game_id=appid),
"executable": self.source.make_executable(game_id=appid),
}
game = Game(values)
@@ -105,7 +102,6 @@ class SteamSourceIterable(SourceIterable):
)
additional_data = {"local_image_path": image_path, "steam_appid": appid}
# Produce game
yield (game, additional_data)

View File

@@ -29,7 +29,7 @@ class ColorLogFormatter(Formatter):
RED = "\033[31m"
YELLOW = "\033[33m"
def format(self, record: LogRecord):
def format(self, record: LogRecord) -> str:
super_format = super().format(record)
match record.levelname:
case "CRITICAL":

View File

@@ -18,11 +18,12 @@
# SPDX-License-Identifier: GPL-3.0-or-later
import lzma
from io import StringIO
from io import TextIOWrapper
from logging import StreamHandler
from lzma import FORMAT_XZ, PRESET_DEFAULT
from os import PathLike
from pathlib import Path
from typing import Optional
from src import shared
@@ -37,7 +38,7 @@ class SessionFileHandler(StreamHandler):
backup_count: int
filename: Path
log_file: StringIO = None
log_file: Optional[TextIOWrapper] = None
def create_dir(self) -> None:
"""Create the log dir if needed"""
@@ -83,23 +84,29 @@ class SessionFileHandler(StreamHandler):
logfiles.sort(key=self.file_sort_key, reverse=True)
return logfiles
def rotate_file(self, path: Path):
def rotate_file(self, path: Path) -> None:
"""Rotate a file's number suffix and remove it if it's too old"""
# If uncompressed, compress
if not path.name.endswith(".xz"):
try:
with open(path, "r", encoding="utf-8") as original_file:
original_data = original_file.read()
except UnicodeDecodeError:
# If the file is corrupted, throw it away
path.unlink()
return
# Compress the file
compressed_path = path.with_suffix(path.suffix + ".xz")
with (
lzma.open(
compressed_path,
"wt",
format=FORMAT_XZ,
preset=PRESET_DEFAULT,
encoding="utf-8",
) as lzma_file,
open(path, "r", encoding="utf-8") as original_file,
):
lzma_file.write(original_file.read())
with lzma.open(
compressed_path,
"wt",
format=FORMAT_XZ,
preset=PRESET_DEFAULT,
encoding="utf-8",
) as lzma_file:
lzma_file.write(original_data)
path.unlink()
path = compressed_path
@@ -128,5 +135,6 @@ class SessionFileHandler(StreamHandler):
super().__init__(self.log_file)
def close(self) -> None:
self.log_file.close()
if self.log_file:
self.log_file.close()
super().close()

View File

@@ -27,7 +27,7 @@ import sys
from src import shared
def setup_logging():
def setup_logging() -> None:
"""Intitate the app's logging"""
is_dev = shared.PROFILE == "development"
@@ -89,7 +89,7 @@ def setup_logging():
logging_dot_config.dictConfig(config)
def log_system_info():
def log_system_info() -> None:
"""Log system debug information"""
logging.debug("Starting %s v%s (%s)", shared.APP_ID, shared.VERSION, shared.PROFILE)

View File

@@ -20,8 +20,10 @@
import json
import lzma
import os
import shlex
import subprocess
import sys
from typing import Any
from typing import Any, Optional
import gi
@@ -36,6 +38,7 @@ from src.details_window import DetailsWindow
from src.game import Game
from src.importer.importer import Importer
from src.importer.sources.bottles_source import BottlesSource
from src.importer.sources.desktop_source import DesktopSource
from src.importer.sources.flatpak_source import FlatpakSource
from src.importer.sources.heroic_source import HeroicSource
from src.importer.sources.itch_source import ItchSource
@@ -78,19 +81,19 @@ class CartridgesApplication(Adw.Application):
Gtk.Window.set_default_icon_name(shared.APP_ID)
# Create the main window
self.win = self.props.active_window # pylint: disable=no-member
if not self.win:
shared.win = self.win = CartridgesWindow(application=self)
win = self.props.active_window # pylint: disable=no-member
if not win:
shared.win = win = CartridgesWindow(application=self)
# Save window geometry
shared.state_schema.bind(
"width", self.win, "default-width", Gio.SettingsBindFlags.DEFAULT
"width", shared.win, "default-width", Gio.SettingsBindFlags.DEFAULT
)
shared.state_schema.bind(
"height", self.win, "default-height", Gio.SettingsBindFlags.DEFAULT
"height", shared.win, "default-height", Gio.SettingsBindFlags.DEFAULT
)
shared.state_schema.bind(
"is-maximized", self.win, "maximized", Gio.SettingsBindFlags.DEFAULT
"is-maximized", shared.win, "maximized", Gio.SettingsBindFlags.DEFAULT
)
# Load games from disk
@@ -99,7 +102,7 @@ class CartridgesApplication(Adw.Application):
self.state = shared.AppState.LOAD_FROM_DISK
self.load_games_from_disk()
self.state = shared.AppState.DEFAULT
self.win.create_source_rows()
shared.win.create_source_rows()
# Add rest of the managers for game imports
shared.store.add_manager(CoverManager())
@@ -125,26 +128,28 @@ class CartridgesApplication(Adw.Application):
("protondb_search",),
("lutris_search",),
("hltb_search",),
("show_sidebar", ("F9",), self.win),
("show_hidden", ("<primary>h",), self.win),
("go_to_parent", ("<alt>Up",), self.win),
("go_home", ("<alt>Home",), self.win),
("toggle_search", ("<primary>f",), self.win),
("escape", ("Escape",), self.win),
("undo", ("<primary>z",), self.win),
("open_menu", ("F10",), self.win),
("close", ("<primary>w",), self.win),
("show_sidebar", ("F9",), shared.win),
("show_hidden", ("<primary>h",), shared.win),
("go_to_parent", ("<alt>Up",), shared.win),
("go_home", ("<alt>Home",), shared.win),
("toggle_search", ("<primary>f",), shared.win),
("escape", ("Escape",), shared.win),
("undo", ("<primary>z",), shared.win),
("open_menu", ("F10",), shared.win),
("close", ("<primary>w",), shared.win),
}
)
sort_action = Gio.SimpleAction.new_stateful(
"sort_by", GLib.VariantType.new("s"), GLib.Variant("s", "a-z")
)
sort_action.connect("activate", self.win.on_sort_action)
self.win.add_action(sort_action)
self.win.on_sort_action(sort_action, shared.state_schema.get_value("sort-mode"))
sort_action.connect("activate", shared.win.on_sort_action)
shared.win.add_action(sort_action)
shared.win.on_sort_action(
sort_action, shared.state_schema.get_value("sort-mode")
)
self.win.present()
shared.win.present()
def load_games_from_disk(self) -> None:
if shared.games_dir.is_dir():
@@ -168,9 +173,9 @@ class CartridgesApplication(Adw.Application):
def on_about_action(self, *_args: Any) -> None:
# Get the debug info from the log files
debug_str = ""
for i, path in enumerate(shared.log_files):
for index, path in enumerate(shared.log_files):
# Add a horizontal line between runs
if i > 0:
if index > 0:
debug_str += "" * 37 + "\n"
# Add the run's logs
log_file = (
@@ -184,7 +189,7 @@ class CartridgesApplication(Adw.Application):
about = Adw.AboutWindow.new_from_appdata(
shared.PREFIX + "/" + shared.APP_ID + ".metainfo.xml", shared.VERSION
)
about.set_transient_for(self.win)
about.set_transient_for(shared.win)
about.set_developers(
(
"kramo https://kramo.hu",
@@ -214,8 +219,8 @@ class CartridgesApplication(Adw.Application):
self,
_action: Any = None,
_parameter: Any = None,
page_name: (str | None) = None,
expander_row: (str | None) = None,
page_name: Optional[str] = None,
expander_row: Optional[str] = None,
) -> CartridgesWindow:
win = PreferencesWindow()
if page_name:
@@ -227,13 +232,13 @@ class CartridgesApplication(Adw.Application):
return win
def on_launch_game_action(self, *_args: Any) -> None:
self.win.active_game.launch()
shared.win.active_game.launch()
def on_hide_game_action(self, *_args: Any) -> None:
self.win.active_game.toggle_hidden()
shared.win.active_game.toggle_hidden()
def on_edit_game_action(self, *_args: Any) -> None:
DetailsWindow(self.win.active_game)
DetailsWindow(shared.win.active_game)
def on_add_game_action(self, *_args: Any) -> None:
DetailsWindow()
@@ -256,6 +261,9 @@ class CartridgesApplication(Adw.Application):
if shared.schema.get_boolean("flatpak"):
shared.importer.add_source(FlatpakSource())
if shared.schema.get_boolean("desktop"):
shared.importer.add_source(DesktopSource())
if shared.schema.get_boolean("itch"):
shared.importer.add_source(ItchSource())
@@ -268,14 +276,14 @@ class CartridgesApplication(Adw.Application):
shared.importer.run()
def on_remove_game_action(self, *_args: Any) -> None:
self.win.active_game.remove_game()
shared.win.active_game.remove_game()
def on_remove_game_details_view_action(self, *_args: Any) -> None:
if self.win.navigation_view.get_visible_page() == self.win.details_page:
if shared.win.navigation_view.get_visible_page() == shared.win.details_page:
self.on_remove_game_action()
def search(self, uri: str) -> None:
Gio.AppInfo.launch_default_for_uri(f"{uri}{self.win.active_game.name}")
Gio.AppInfo.launch_default_for_uri(f"{uri}{shared.win.active_game.name}")
def on_igdb_search_action(self, *_args: Any) -> None:
self.search("https://www.igdb.com/search?type=1&q=")

View File

@@ -21,7 +21,7 @@ import logging
import re
from pathlib import Path
from shutil import rmtree
from typing import Any, Callable
from typing import Any, Callable, Optional
from gi.repository import Adw, Gio, GLib, Gtk
@@ -97,6 +97,8 @@ class PreferencesWindow(Adw.PreferencesWindow):
flatpak_data_file_chooser_button = Gtk.Template.Child()
flatpak_import_launchers_switch = Gtk.Template.Child()
desktop_switch = Gtk.Template.Child()
sgdb_key_group = Gtk.Template.Child()
sgdb_key_entry_row = Gtk.Template.Child()
sgdb_switch = Gtk.Template.Child()
@@ -113,9 +115,8 @@ class PreferencesWindow(Adw.PreferencesWindow):
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.win = shared.win
self.file_chooser = Gtk.FileDialog()
self.set_transient_for(self.win)
self.set_transient_for(shared.win)
self.toast = Adw.Toast.new(_("All games removed"))
self.toast.set_button_label(_("Undo"))
@@ -188,6 +189,7 @@ class PreferencesWindow(Adw.PreferencesWindow):
"sgdb",
"sgdb-prefer",
"sgdb-animated",
"desktop",
}
)
@@ -213,9 +215,9 @@ class PreferencesWindow(Adw.PreferencesWindow):
)
def choose_folder(
self, _widget: Any, callback: Callable, callback_data: str | None = None
self, _widget: Any, callback: Callable, callback_data: Optional[str] = None
) -> None:
self.file_chooser.select_folder(self.win, None, callback, callback_data)
self.file_chooser.select_folder(shared.win, None, callback, callback_data)
def undo_remove_all(self, *_args: Any) -> None:
shared.win.get_application().state = shared.AppState.UNDO_REMOVE_ALL_GAMES
@@ -238,8 +240,8 @@ class PreferencesWindow(Adw.PreferencesWindow):
game.save()
game.update()
if self.win.navigation_view.get_visible_page() == self.win.details_page:
self.win.navigation_view.pop()
if shared.win.navigation_view.get_visible_page() == shared.win.details_page:
shared.win.navigation_view.pop()
self.add_toast(self.toast)
shared.win.get_application().state = shared.AppState.DEFAULT
@@ -280,9 +282,6 @@ class PreferencesWindow(Adw.PreferencesWindow):
def resolve_locations(self, source: Source) -> None:
"""Resolve locations and add a warning if location cannot be found"""
def clear_warning_selection(_widget: Any, label: Gtk.Label) -> None:
label.select_region(-1, -1)
for location_name, location in source.locations._asdict().items():
action_row = getattr(
self, f"{source.source_id}_{location_name}_action_row", None
@@ -294,15 +293,16 @@ class PreferencesWindow(Adw.PreferencesWindow):
location.resolve()
except UnresolvableLocationError:
title = _("Installation Not Found")
description = _("Select a valid directory.")
format_start = '<span rise="12pt"><b><big>'
format_end = "</big></b></span>\n"
popover = Gtk.Popover(
focusable=True,
child=(
label := Gtk.Label(
label=(
'<span rise="12pt"><b><big>'
+ _("Installation Not Found")
+ "</big></b></span>\n"
+ _("Select a valid directory.")
),
Gtk.Label(
label=format_start + title + format_end + description,
use_markup=True,
wrap=True,
max_width_chars=50,
@@ -313,17 +313,24 @@ class PreferencesWindow(Adw.PreferencesWindow):
margin_bottom=9,
margin_start=12,
margin_end=12,
selectable=True,
)
)
),
)
popover.connect("show", clear_warning_selection, label)
popover.update_property(
(Gtk.AccessibleProperty.LABEL,), (title + description,)
)
def set_a11y_label(widget: Gtk.Popover) -> None:
self.set_focus(widget)
popover.connect("show", set_a11y_label)
menu_button = Gtk.MenuButton(
icon_name="dialog-warning-symbolic",
valign=Gtk.Align.CENTER,
popover=popover,
tooltip_text=_("Warning"),
)
menu_button.add_css_class("warning")

View File

@@ -62,5 +62,6 @@ image_size = (200 * scale_factor, 300 * scale_factor)
# pylint: disable=invalid-name
win = None
importer = None
import_time = None
store = None
log_files = None

View File

@@ -17,13 +17,12 @@
#
# SPDX-License-Identifier: GPL-3.0-or-later
from typing import Callable, Any
from typing import Any, Callable
from gi.repository import Gio
from src.game import Game
from src.store.managers.manager import Manager
from src.utils.task import Task
class AsyncManager(Manager):
@@ -49,11 +48,10 @@ class AsyncManager(Manager):
self, game: Game, additional_data: dict, callback: Callable[["Manager"], Any]
) -> None:
"""Create a task to process the game in a separate thread"""
task = Task.new(None, self.cancellable, self._task_callback, (callback,))
task.set_task_data((game, additional_data))
task.run_in_thread(self._task_thread_func)
task = Gio.Task.new(None, self.cancellable, self._task_callback, (callback,))
task.run_in_thread(lambda *_: self._task_thread_func((game, additional_data)))
def _task_thread_func(self, _task, _source_object, data, _cancellable):
def _task_thread_func(self, data):
"""Task thread entry point"""
game, additional_data, *_rest = data
self.run(game, additional_data)

View File

@@ -22,14 +22,14 @@ from pathlib import Path
from typing import NamedTuple
import requests
from gi.repository import Gio, GdkPixbuf
from gi.repository import GdkPixbuf, Gio
from requests.exceptions import HTTPError, SSLError
from src import shared
from src.game import Game
from src.store.managers.manager import Manager
from src.store.managers.steam_api_manager import SteamAPIManager
from src.utils.save_cover import resize_cover, save_cover
from src.utils.save_cover import convert_cover, save_cover
class ImageSize(NamedTuple):
@@ -110,18 +110,16 @@ class CoverManager(Manager):
stretch = 1 - (resized_height / cover_size.height)
return stretch <= max_stretch
def save_composited_cover(
def composite_cover(
self,
game: Game,
image_path: Path,
scale: float = 1,
blur_size: ImageSize = ImageSize(2, 2),
) -> None:
) -> GdkPixbuf.Pixbuf:
"""
Save the image composited with a background blur.
Return the image composited with a background blur.
If the image is stretchable, just stretch it.
:param game: The game to save the cover for
:param path: Path where the source image is located
:param scale:
Scale of the smalled image side
@@ -130,14 +128,15 @@ class CoverManager(Manager):
"""
# Load source image
source = GdkPixbuf.Pixbuf.new_from_file(str(image_path))
source = GdkPixbuf.Pixbuf.new_from_file(
str(convert_cover(image_path, resize=False))
)
source_size = ImageSize(source.get_width(), source.get_height())
cover_size = ImageSize._make(shared.image_size)
# Stretch if possible
if scale == 1 and self.is_stretchable(source_size, cover_size):
save_cover(game.game_id, resize_cover(pixbuf=source))
return
return source
# Create the blurred cover background
# fmt: off
@@ -164,7 +163,7 @@ class CoverManager(Manager):
GdkPixbuf.InterpType.BILINEAR,
255,
)
save_cover(game.game_id, resize_cover(pixbuf=cover))
return cover
def main(self, game: Game, additional_data: dict) -> None:
if game.blacklisted:
@@ -185,13 +184,15 @@ class CoverManager(Manager):
continue
# Icon cover
if key == "local_icon_path":
self.save_composited_cover(
game,
image_path,
scale=0.7,
blur_size=ImageSize(1, 2),
)
return
composite_kwargs = {}
self.save_composited_cover(game, image_path)
if key == "local_icon_path":
composite_kwargs["scale"] = 0.7
composite_kwargs["blur_size"] = ImageSize(1, 2)
save_cover(
game.game_id,
convert_cover(
pixbuf=self.composite_cover(image_path, **composite_kwargs)
),
)

View File

@@ -46,7 +46,7 @@ class Manager(ErrorProducer):
max_tries: int = 3
@property
def name(self):
def name(self) -> str:
return type(self).__name__
@abstractmethod
@@ -59,13 +59,13 @@ class Manager(ErrorProducer):
* May raise other exceptions that will be reported
"""
def run(self, game: Game, additional_data: dict):
def run(self, game: Game, additional_data: dict) -> None:
"""Handle errors (retry, ignore or raise) that occur in the manager logic"""
# Keep track of the number of tries
tries = 1
def handle_error(error: Exception):
def handle_error(error: Exception) -> None:
nonlocal tries
# If FriendlyError, handle its cause instead
@@ -83,11 +83,11 @@ class Manager(ErrorProducer):
retrying_format = "Retrying %s in %s for %s"
unretryable_format = "Unretryable %s in %s for %s"
if error in self.continue_on:
if type(error) in self.continue_on:
# Handle skippable errors (skip silently)
return
if error in self.retryable_on:
if type(error) in self.retryable_on:
if tries > self.max_tries:
# Handle being out of retries
logging.error(out_of_retries_format, *log_args)
@@ -104,7 +104,7 @@ class Manager(ErrorProducer):
logging.error(unretryable_format, *log_args, exc_info=error)
self.report_error(base_error)
def try_manager_logic():
def try_manager_logic() -> None:
try:
self.main(game, additional_data)
except Exception as error: # pylint: disable=broad-exception-caught

View File

@@ -24,13 +24,13 @@ from requests.exceptions import HTTPError, SSLError
from src.errors.friendly_error import FriendlyError
from src.game import Game
from src.store.managers.async_manager import AsyncManager
from src.store.managers.steam_api_manager import SteamAPIManager
from src.store.managers.cover_manager import CoverManager
from src.store.managers.steam_api_manager import SteamAPIManager
from src.utils.steamgriddb import SGDBAuthError, SGDBHelper
class SGDBManager(AsyncManager):
"""Manager in charge of downloading a game's cover from steamgriddb"""
"""Manager in charge of downloading a game's cover from SteamGridDB"""
run_after = (SteamAPIManager, CoverManager)
retryable_on = (HTTPError, SSLError, ConnectionError, JSONDecodeError)

View File

@@ -23,8 +23,8 @@ from urllib3.exceptions import ConnectionError as Urllib3ConnectionError
from src.game import Game
from src.store.managers.async_manager import AsyncManager
from src.utils.steam import (
SteamGameNotFoundError,
SteamAPIHelper,
SteamGameNotFoundError,
SteamNotAGameError,
SteamRateLimiter,
)
@@ -44,7 +44,7 @@ class SteamAPIManager(AsyncManager):
self.steam_api_helper = SteamAPIHelper(self.steam_rate_limiter)
def main(self, game: Game, additional_data: dict) -> None:
# Skip non-steam games
# Skip non-Steam games
appid = additional_data.get("steam_appid", None)
if appid is None:
return

View File

@@ -83,7 +83,7 @@ class Pipeline(GObject.Object):
progress = 1
return progress
def advance(self):
def advance(self) -> None:
"""Spawn tasks for managers that are able to run for a game"""
# Separate blocking / async managers

View File

@@ -18,7 +18,7 @@
# SPDX-License-Identifier: GPL-3.0-or-later
import logging
from typing import Any, Generator, MutableMapping
from typing import Any, Generator, MutableMapping, Optional
from src import shared
from src.game import Game
@@ -77,13 +77,15 @@ class Store:
except KeyError:
return default
def add_manager(self, manager: Manager, in_pipeline=True):
def add_manager(self, manager: Manager, in_pipeline: bool = True) -> None:
"""Add a manager to the store"""
manager_type = type(manager)
self.managers[manager_type] = manager
self.toggle_manager_in_pipelines(manager_type, in_pipeline)
def toggle_manager_in_pipelines(self, manager_type: type[Manager], enable: bool):
def toggle_manager_in_pipelines(
self, manager_type: type[Manager], enable: bool
) -> None:
"""Change if a manager should run in new pipelines"""
if enable:
self.pipeline_managers.add(self.managers[manager_type])
@@ -108,8 +110,8 @@ class Store:
pass
def add_game(
self, game: Game, additional_data: dict, run_pipeline=True
) -> Pipeline | None:
self, game: Game, additional_data: dict, run_pipeline: bool = True
) -> Optional[Pipeline]:
"""Add a game to the app"""
# Ignore games from a newer spec version

View File

@@ -17,10 +17,18 @@
#
# SPDX-License-Identifier: GPL-3.0-or-later
from gi.repository import Adw
from typing import Optional
from gi.repository import Adw, Gtk
def create_dialog(win, heading, body, extra_option=None, extra_label=None):
def create_dialog(
win: Gtk.Window,
heading: str,
body: str,
extra_option: Optional[str] = None,
extra_label: Optional[str] = None,
) -> Adw.MessageDialog:
dialog = Adw.MessageDialog.new(win, heading, body)
dialog.add_response("dismiss", _("Dismiss"))

View File

@@ -23,14 +23,14 @@ from pathlib import Path
from src import shared
old_data_dir = Path.home() / ".local" / "share"
old_data_dir = shared.home / ".local" / "share"
old_cartridges_data_dir = old_data_dir / "cartridges"
migrated_file_path = old_cartridges_data_dir / ".migrated"
old_games_dir = old_cartridges_data_dir / "games"
old_covers_dir = old_cartridges_data_dir / "covers"
def migrate_game_covers(game_path: Path):
def migrate_game_covers(game_path: Path) -> None:
"""Migrate a game covers from a source game path to the current dir"""
for suffix in (".tiff", ".gif"):
cover_path = old_covers_dir / game_path.with_suffix(suffix).name
@@ -41,7 +41,7 @@ def migrate_game_covers(game_path: Path):
cover_path.rename(destination_cover_path)
def migrate_files_v1_to_v2():
def migrate_files_v1_to_v2() -> None:
"""
Migrate user data from the v1.X locations to the latest location.

View File

@@ -17,11 +17,11 @@
#
# SPDX-License-Identifier: GPL-3.0-or-later
from typing import Optional, Sized
from threading import Lock, Thread, BoundedSemaphore
from time import sleep, time
from collections import deque
from contextlib import AbstractContextManager
from threading import BoundedSemaphore, Lock, Thread
from time import sleep, time
from typing import Any, Sized
class PickHistory(Sized):
@@ -30,22 +30,22 @@ class PickHistory(Sized):
period: int
timestamps: list[int] = None
timestamps_lock: Lock = None
timestamps: list[float]
timestamps_lock: Lock
def __init__(self, period: int) -> None:
self.period = period
self.timestamps = []
self.timestamps_lock = Lock()
def remove_old_entries(self):
def remove_old_entries(self) -> None:
"""Remove history entries older than the period"""
now = time()
cutoff = now - self.period
with self.timestamps_lock:
self.timestamps = [entry for entry in self.timestamps if entry > cutoff]
def add(self, *new_timestamps: Optional[int]):
def add(self, *new_timestamps: float) -> None:
"""Add timestamps to the history.
If none given, will add the current timestamp"""
if len(new_timestamps) == 0:
@@ -60,7 +60,7 @@ class PickHistory(Sized):
return len(self.timestamps)
@property
def start(self) -> int:
def start(self) -> float:
"""Get the time at which the history started"""
self.remove_old_entries()
with self.timestamps_lock:
@@ -70,7 +70,7 @@ class PickHistory(Sized):
entry = time()
return entry
def copy_timestamps(self) -> str:
def copy_timestamps(self) -> list[float]:
"""Get a copy of the timestamps history"""
self.remove_old_entries()
with self.timestamps_lock:
@@ -79,51 +79,55 @@ class PickHistory(Sized):
# pylint: disable=too-many-instance-attributes
class RateLimiter(AbstractContextManager):
"""Rate limiter implementing the token bucket algorithm"""
"""
Base rate limiter implementing the token bucket algorithm.
Do not use directly, create a child class to tailor the rate limiting to the
underlying service's limits.
Subclasses must provide values to the following attributes:
* refill_period_seconds - Period in which we have a max amount of tokens
* refill_period_tokens - Number of tokens allowed in this period
* burst_tokens - Max number of tokens that can be consumed instantly
"""
# Period in which we have a max amount of tokens
refill_period_seconds: int
# Number of tokens allowed in this period
refill_period_tokens: int
# Max number of tokens that can be consumed instantly
burst_tokens: int
pick_history: PickHistory = None
bucket: BoundedSemaphore = None
queue: deque[Lock] = None
queue_lock: Lock = None
pick_history: PickHistory
bucket: BoundedSemaphore
queue: deque[Lock]
queue_lock: Lock
# Protect the number of tokens behind a lock
__n_tokens_lock: Lock = None
__n_tokens_lock: Lock
__n_tokens = 0
@property
def n_tokens(self):
def n_tokens(self) -> int:
with self.__n_tokens_lock:
return self.__n_tokens
@n_tokens.setter
def n_tokens(self, value: int):
def n_tokens(self, value: int) -> None:
with self.__n_tokens_lock:
self.__n_tokens = value
def __init__(
self,
refill_period_seconds: Optional[int] = None,
refill_period_tokens: Optional[int] = None,
burst_tokens: Optional[int] = None,
) -> None:
def _init_pick_history(self) -> None:
"""
Initialize the tocken pick history
(only for use in this class and its children)
By default, creates an empty pick history.
Should be overriden or extended by subclasses.
"""
self.pick_history = PickHistory(self.refill_period_seconds)
def __init__(self) -> None:
"""Initialize the limiter"""
# Initialize default values
if refill_period_seconds is not None:
self.refill_period_seconds = refill_period_seconds
if refill_period_tokens is not None:
self.refill_period_tokens = refill_period_tokens
if burst_tokens is not None:
self.burst_tokens = burst_tokens
if self.pick_history is None:
self.pick_history = PickHistory(self.refill_period_seconds)
self._init_pick_history()
# Create synchronization data
self.__n_tokens_lock = Lock()
@@ -147,8 +151,8 @@ class RateLimiter(AbstractContextManager):
"""
# Compute ideal spacing
tokens_left = self.refill_period_tokens - len(self.pick_history)
seconds_left = self.pick_history.start + self.refill_period_seconds - time()
tokens_left = self.refill_period_tokens - len(self.pick_history) # type: ignore
seconds_left = self.pick_history.start + self.refill_period_seconds - time() # type: ignore
try:
spacing_seconds = seconds_left / tokens_left
except ZeroDivisionError:
@@ -159,7 +163,7 @@ class RateLimiter(AbstractContextManager):
natural_spacing = self.refill_period_seconds / self.refill_period_tokens
return max(natural_spacing, spacing_seconds)
def refill(self):
def refill(self) -> None:
"""Add a token back in the bucket"""
sleep(self.refill_spacing)
try:
@@ -170,7 +174,7 @@ class RateLimiter(AbstractContextManager):
else:
self.n_tokens += 1
def refill_thread_func(self):
def refill_thread_func(self) -> None:
"""Entry point for the daemon thread that is refilling the bucket"""
while True:
self.refill()
@@ -200,18 +204,18 @@ class RateLimiter(AbstractContextManager):
self.queue.appendleft(lock)
return lock
def acquire(self):
def acquire(self) -> None:
"""Acquires a token from the bucket when it's your turn in queue"""
lock = self.add_to_queue()
self.update_queue()
# Wait until our turn in queue
lock.acquire() # pylint: disable=consider-using-with
self.pick_history.add()
self.pick_history.add() # type: ignore
# --- Support for use in with statements
def __enter__(self):
def __enter__(self) -> None:
self.acquire()
def __exit__(self, *_args):
def __exit__(self, *_args: Any) -> None:
pass

View File

@@ -18,11 +18,12 @@
# SPDX-License-Identifier: GPL-3.0-or-later
from datetime import datetime
from typing import Any
from gi.repository import GLib
def relative_date(timestamp): # pylint: disable=too-many-return-statements
def relative_date(timestamp: int) -> Any: # pylint: disable=too-many-return-statements
days_no = ((today := datetime.today()) - datetime.fromtimestamp(timestamp)).days
if days_no == 0:

View File

@@ -20,17 +20,30 @@
from pathlib import Path
from shutil import copyfile
from typing import Optional
from gi.repository import Gdk, Gio, GLib
from gi.repository import Gdk, GdkPixbuf, Gio, GLib
from PIL import Image, ImageSequence, UnidentifiedImageError
from src import shared
def resize_cover(cover_path=None, pixbuf=None):
def convert_cover(
cover_path: Optional[Path] = None,
pixbuf: Optional[GdkPixbuf.Pixbuf] = None,
resize: bool = True,
) -> Optional[Path]:
if not cover_path and not pixbuf:
return None
pixbuf_extensions = set()
for pixbuf_format in GdkPixbuf.Pixbuf.get_formats():
for pixbuf_extension in pixbuf_format.get_extensions():
pixbuf_extensions.add(pixbuf_extension)
if not resize and cover_path and cover_path.suffix.lower()[1:] in pixbuf_extensions:
return cover_path
if pixbuf:
cover_path = Path(Gio.File.new_tmp("XXXXXX.tiff")[0].get_path())
pixbuf.savev(str(cover_path), "tiff", ["compression"], ["1"])
@@ -39,7 +52,8 @@ def resize_cover(cover_path=None, pixbuf=None):
with Image.open(cover_path) as image:
if getattr(image, "is_animated", False):
frames = tuple(
frame.resize((200, 300)) for frame in ImageSequence.Iterator(image)
frame.resize((200, 300)) if resize else frame
for frame in ImageSequence.Iterator(image)
)
tmp_path = Path(Gio.File.new_tmp("XXXXXX.gif")[0].get_path())
@@ -47,6 +61,7 @@ def resize_cover(cover_path=None, pixbuf=None):
tmp_path,
save_all=True,
append_images=frames[1:],
disposal=2,
)
else:
@@ -56,7 +71,7 @@ def resize_cover(cover_path=None, pixbuf=None):
image = image.convert("RGBA")
tmp_path = Path(Gio.File.new_tmp("XXXXXX.tiff")[0].get_path())
image.resize(shared.image_size).save(
(image.resize(shared.image_size) if resize else image).save(
tmp_path,
compression="tiff_adobe_deflate"
if shared.schema.get_boolean("high-quality-images")
@@ -67,14 +82,14 @@ def resize_cover(cover_path=None, pixbuf=None):
Gdk.Texture.new_from_filename(str(cover_path)).save_to_tiff(
tmp_path := Gio.File.new_tmp("XXXXXX.tiff")[0].get_path()
)
return resize_cover(tmp_path)
return convert_cover(tmp_path)
except GLib.GError:
return None
return tmp_path
def save_cover(game_id, cover_path):
def save_cover(game_id: str, cover_path: Path) -> None:
shared.covers_dir.mkdir(parents=True, exist_ok=True)
animated_path = shared.covers_dir / f"{game_id}.gif"

View File

@@ -21,13 +21,14 @@
import json
import logging
import re
from pathlib import Path
from typing import TypedDict
import requests
from requests.exceptions import HTTPError
from src import shared
from src.utils.rate_limiter import PickHistory, RateLimiter
from src.utils.rate_limiter import RateLimiter
class SteamError(Exception):
@@ -71,16 +72,18 @@ class SteamRateLimiter(RateLimiter):
refill_period_tokens = 200
burst_tokens = 100
def __init__(self) -> None:
# Load pick history from schema
# (Remember API limits through restarts of Cartridges)
def _init_pick_history(self) -> None:
"""
Load the pick history from schema.
Allows remembering API limits through restarts of Cartridges.
"""
super()._init_pick_history()
timestamps_str = shared.state_schema.get_string("steam-limiter-tokens-history")
self.pick_history = PickHistory(self.refill_period_seconds)
self.pick_history.add(*json.loads(timestamps_str))
self.pick_history.remove_old_entries()
super().__init__()
def acquire(self):
def acquire(self) -> None:
"""Get a token from the bucket and store the pick history in the schema"""
super().acquire()
timestamps_str = json.dumps(self.pick_history.copy_timestamps())
@@ -88,9 +91,9 @@ class SteamRateLimiter(RateLimiter):
class SteamFileHelper:
"""Helper for steam file formats"""
"""Helper for Steam file formats"""
def get_manifest_data(self, manifest_path) -> SteamManifestData:
def get_manifest_data(self, manifest_path: Path) -> SteamManifestData:
"""Get local data for a game from its manifest"""
with open(manifest_path, "r", encoding="utf-8") as file:
@@ -104,7 +107,11 @@ class SteamFileHelper:
raise SteamInvalidManifestError()
data[key] = match.group(1)
return SteamManifestData(**data)
return SteamManifestData(
name=data["name"],
appid=data["appid"],
stateflags=data["stateflags"],
)
class SteamAPIHelper:
@@ -116,7 +123,7 @@ class SteamAPIHelper:
def __init__(self, rate_limiter: RateLimiter) -> None:
self.rate_limiter = rate_limiter
def get_api_data(self, appid) -> SteamAPIData:
def get_api_data(self, appid: str) -> SteamAPIData:
"""
Get online data for a game from its appid.
May block to satisfy the Steam web API limitations.

View File

@@ -20,13 +20,15 @@
import logging
from pathlib import Path
from typing import Any
import requests
from gi.repository import Gio
from requests.exceptions import HTTPError
from src import shared
from src.utils.save_cover import resize_cover, save_cover
from src.game import Game
from src.utils.save_cover import convert_cover, save_cover
class SGDBError(Exception):
@@ -55,12 +57,12 @@ class SGDBHelper:
base_url = "https://www.steamgriddb.com/api/v2/"
@property
def auth_headers(self):
def auth_headers(self) -> dict[str, str]:
key = shared.schema.get_string("sgdb-key")
headers = {"Authorization": f"Bearer {key}"}
return headers
def get_game_id(self, game):
def get_game_id(self, game: Game) -> Any:
"""Get grid results for a game. Can raise an exception."""
uri = f"{self.base_url}search/autocomplete/{game.name}"
res = requests.get(uri, headers=self.auth_headers, timeout=5)
@@ -74,7 +76,7 @@ class SGDBHelper:
case _:
res.raise_for_status()
def get_image_uri(self, game_id, animated=False):
def get_image_uri(self, game_id: str, animated: bool = False) -> Any:
"""Get the image for a SGDB game id"""
uri = f"{self.base_url}grids/game/{game_id}?dimensions=600x900"
if animated:
@@ -93,7 +95,7 @@ class SGDBHelper:
case _:
res.raise_for_status()
def conditionaly_update_cover(self, game):
def conditionaly_update_cover(self, game: Game) -> None:
"""Update the game's cover if appropriate"""
# Obvious skips
@@ -132,7 +134,7 @@ class SGDBHelper:
tmp_file = Gio.File.new_tmp()[0]
tmp_file_path = tmp_file.get_path()
Path(tmp_file_path).write_bytes(response.content)
save_cover(game.game_id, resize_cover(tmp_file_path))
save_cover(game.game_id, convert_cover(tmp_file_path))
except SGDBAuthError as error:
# Let caller handle auth errors
raise error

View File

@@ -1,86 +0,0 @@
# task.py
#
# Copyright 2023 Geoffrey Coulaud
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# SPDX-License-Identifier: GPL-3.0-or-later
from functools import wraps
from gi.repository import Gio
def create_task_thread_func_closure(func, data):
"""Wrap a Gio.TaskThreadFunc with the given data in a closure"""
def closure(task, source_object, _data, cancellable):
func(task, source_object, data, cancellable)
return closure
def decorate_set_task_data(task):
"""Decorate Gio.Task.set_task_data to replace it"""
def decorator(original_method):
@wraps(original_method)
def new_method(task_data):
task.task_data = task_data
return new_method
return decorator
def decorate_run_in_thread(task):
"""Decorate Gio.Task.run_in_thread to pass the task data correctly
Creates a closure around task_thread_func with the task data available."""
def decorator(original_method):
@wraps(original_method)
def new_method(task_thread_func):
closure = create_task_thread_func_closure(task_thread_func, task.task_data)
original_method(closure)
return new_method
return decorator
# pylint: disable=too-few-public-methods
class Task:
"""Wrapper around Gio.Task to patch task data not being passed"""
@classmethod
def new(cls, source_object, cancellable, callback, callback_data):
"""Create a new, monkey-patched Gio.Task.
The `set_task_data` and `run_in_thread` methods are decorated.
As of 2023-05-19, pygobject does not work well with Gio.Task, so to pass data
the only viable way it to create a closure with the thread function and its data.
This class is supposed to make Gio.Task comply with its expected behaviour
per the docs:
http://lazka.github.io/pgi-docs/#Gio-2.0/classes/Task.html#Gio.Task.set_task_data
This code may break if pygobject overrides change in the future.
We need to manually pass `self` to the decorators since it's otherwise bound but
not accessible from Python's side.
"""
task = Gio.Task.new(source_object, cancellable, callback, callback_data)
task.set_task_data = decorate_set_task_data(task)(task.set_task_data)
task.run_in_thread = decorate_run_in_thread(task)(task.run_in_thread)
return task

View File

@@ -17,7 +17,7 @@
#
# SPDX-License-Identifier: GPL-3.0-or-later
from typing import Any
from typing import Any, Optional
from gi.repository import Adw, Gio, GLib, Gtk
@@ -80,7 +80,7 @@ class CartridgesWindow(Adw.ApplicationWindow):
game_covers: dict = {}
toasts: dict = {}
active_game: Game
details_view_game_cover: GameCover | None = None
details_view_game_cover: Optional[GameCover] = None
sort_state: str = "a-z"
filter_state: str = "all"
source_rows: dict = {}
@@ -253,6 +253,21 @@ class CartridgesWindow(Adw.ApplicationWindow):
style_manager.connect("notify::dark", self.set_details_view_opacity)
style_manager.connect("notify::high-contrast", self.set_details_view_opacity)
# Allow for a custom number of rows for the library
if shared.schema.get_uint("library-rows"):
shared.schema.bind(
"library-rows",
self.library,
"max-children-per-line",
Gio.SettingsBindFlags.DEFAULT,
)
shared.schema.bind(
"library-rows",
self.hidden_library,
"max-children-per-line",
Gio.SettingsBindFlags.DEFAULT,
)
def search_changed(self, _widget: Any, hidden: bool) -> None:
# Refresh search filter on keystroke in search box
(self.hidden_library if hidden else self.library).invalidate_filter()
@@ -401,7 +416,7 @@ class CartridgesWindow(Adw.ApplicationWindow):
).lower()
if var != "name" and get_value(0) == get_value(1):
var, order = "name", True
var, order = "name", False
return ((get_value(0) > get_value(1)) ^ order) * 2 - 1
@@ -476,7 +491,7 @@ class CartridgesWindow(Adw.ApplicationWindow):
index += 1
def on_undo_action(
self, _widget: Any, game: Game | None = None, undo: str | None = None
self, _widget: Any, game: Optional[Game] = None, undo: Optional[str] = None
) -> None:
if not game: # If the action was activated via Ctrl + Z
if shared.importer and (