Implement search provider (#201)

* Begin work on search provider

* Initial search provider work, organize meson

* Initial work on icons

* Implement LaunchSearch

* Don't hold arbitrary reference to service

I don't know why Lollypop does this

* Send notification, pad images

* Update translations

* Fix init_search_term typing
This commit is contained in:
kramo
2023-10-10 22:47:32 +02:00
committed by GitHub
parent 61e7e0274c
commit 69928a8b4f
52 changed files with 687 additions and 293 deletions

View File

@@ -0,0 +1,109 @@
# bottles_source.py
#
# Copyright 2022-2023 kramo
# Copyright 2023 Geoffrey Coulaud
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# SPDX-License-Identifier: GPL-3.0-or-later
from pathlib import Path
from typing import NamedTuple
import yaml
from cartridges import shared
from cartridges.game import Game
from cartridges.importer.location import Location, LocationSubPath
from cartridges.importer.source import SourceIterable, URLExecutableSource
class BottlesSourceIterable(SourceIterable):
source: "BottlesSource"
def __iter__(self):
"""Generator method producing games"""
data = self.source.locations.data["library.yml"].read_text("utf-8")
library: dict = yaml.safe_load(data)
for entry in library.values():
# Build game
values = {
"source": self.source.source_id,
"added": shared.import_time,
"name": entry["name"],
"game_id": self.source.game_id_format.format(game_id=entry["id"]),
"executable": self.source.make_executable(
bottle_name=entry["bottle"]["name"],
game_name=entry["name"],
),
}
game = Game(values)
# Get official cover path
try:
# This will not work if both Cartridges and Bottles are installed via Flatpak
# as Cartridges can't access directories picked via Bottles' file picker portal
bottles_location = Path(
yaml.safe_load(
self.source.locations.data["data.yml"].read_text("utf-8")
)["custom_bottles_path"]
)
except (FileNotFoundError, KeyError):
bottles_location = self.source.locations.data.root / "bottles"
bottle_path = entry["bottle"]["path"]
additional_data = {}
if entry["thumbnail"]:
image_name = entry["thumbnail"].split(":")[1]
image_path = bottles_location / bottle_path / "grids" / image_name
additional_data = {"local_image_path": image_path}
yield (game, additional_data)
class BottlesLocations(NamedTuple):
data: Location
class BottlesSource(URLExecutableSource):
"""Generic Bottles source"""
source_id = "bottles"
name = _("Bottles")
iterable_class = BottlesSourceIterable
url_format = 'bottles:run/"{bottle_name}"/"{game_name}"'
available_on = {"linux"}
locations: BottlesLocations
def __init__(self) -> None:
super().__init__()
self.locations = BottlesLocations(
Location(
schema_key="bottles-location",
candidates=(
shared.flatpak_dir / "com.usebottles.bottles" / "data" / "bottles",
shared.data_dir / "bottles/",
shared.home / ".local" / "share" / "bottles",
),
paths={
"library.yml": LocationSubPath("library.yml"),
"data.yml": LocationSubPath("data.yml"),
},
invalid_subtitle=Location.DATA_INVALID_SUBTITLE,
)
)

View File

@@ -0,0 +1,223 @@
# desktop_source.py
#
# Copyright 2023 kramo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# SPDX-License-Identifier: GPL-3.0-or-later
import os
import shlex
import subprocess
from pathlib import Path
from typing import NamedTuple
from gi.repository import GLib, Gtk
from cartridges import shared
from cartridges.game import Game
from cartridges.importer.source import Source, SourceIterable
class DesktopSourceIterable(SourceIterable):
source: "DesktopSource"
def __iter__(self):
"""Generator method producing games"""
icon_theme = Gtk.IconTheme.new()
search_paths = [
shared.home / ".local" / "share",
"/run/host/usr/local/share",
"/run/host/usr/share",
"/run/host/usr/share/pixmaps",
"/usr/share/pixmaps",
] + GLib.get_system_data_dirs()
for search_path in search_paths:
path = Path(search_path)
if not str(search_path).endswith("/pixmaps"):
path = path / "icons"
if not path.is_dir():
continue
if str(path).startswith("/app/"):
continue
icon_theme.add_search_path(str(path))
launch_command, full_path = self.check_launch_commands()
for path in search_paths:
if str(path).startswith("/app/"):
continue
path = Path(path) / "applications"
if not path.is_dir():
continue
for entry in path.iterdir():
if entry.suffix != ".desktop":
continue
# Skip Lutris games
if str(entry.name).startswith("net.lutris."):
continue
keyfile = GLib.KeyFile.new()
try:
keyfile.load_from_file(str(entry), 0)
if "Game" not in keyfile.get_string_list(
"Desktop Entry", "Categories"
):
continue
name = keyfile.get_string("Desktop Entry", "Name")
executable = keyfile.get_string("Desktop Entry", "Exec").split(
" %"
)[0]
except GLib.GError:
continue
try:
try_exec = "which " + keyfile.get_string("Desktop Entry", "TryExec")
if not self.check_command(try_exec):
continue
except GLib.GError:
pass
# Skip Steam games
if "steam://rungameid/" in executable:
continue
# Skip Heroic games
if "heroic://launch/" in executable:
continue
# Skip Bottles games
if "bottles-cli " in executable:
continue
try:
if keyfile.get_boolean("Desktop Entry", "NoDisplay"):
continue
except GLib.GError:
pass
try:
if keyfile.get_boolean("Desktop Entry", "Hidden"):
continue
except GLib.GError:
pass
# Strip /run/host from Flatpak paths
if entry.is_relative_to(prefix := "/run/host"):
entry = Path("/") / entry.relative_to(prefix)
launch_arg = shlex.quote(str(entry if full_path else entry.stem))
values = {
"source": self.source.source_id,
"added": shared.import_time,
"name": name,
"game_id": f"desktop_{entry.stem}",
"executable": f"{launch_command} {launch_arg}",
}
game = Game(values)
additional_data = {}
try:
icon_str = keyfile.get_string("Desktop Entry", "Icon")
except GLib.GError:
yield game
continue
else:
if "/" in icon_str:
additional_data = {"local_icon_path": Path(icon_str)}
yield (game, additional_data)
continue
try:
if (
icon_path := icon_theme.lookup_icon(
icon_str,
None,
512,
1,
shared.win.get_direction(),
0,
)
.get_file()
.get_path()
):
additional_data = {"local_icon_path": Path(icon_path)}
except GLib.GError:
pass
yield (game, additional_data)
def check_command(self, command) -> bool:
flatpak_str = "flatpak-spawn --host /bin/sh -c "
if os.getenv("FLATPAK_ID") == shared.APP_ID:
command = flatpak_str + shlex.quote(command)
try:
subprocess.run(command, shell=True, check=True)
except subprocess.CalledProcessError:
return False
return True
def check_launch_commands(self) -> (str, bool):
"""Check whether `gio launch` `gtk4-launch` or `gtk-launch` are available on the system"""
commands = (("gio launch", True), ("gtk4-launch", False), ("gtk-launch", False))
for command, full_path in commands:
# Even if `gio` is available, `gio launch` is only available on GLib >= 2.67.2
command_to_check = (
"gio help launch" if command == "gio launch" else f"which {command}"
)
if self.check_command(command_to_check):
return command, full_path
return commands[2]
class DesktopLocations(NamedTuple):
pass
class DesktopSource(Source):
"""Generic Flatpak source"""
source_id = "desktop"
name = _("Desktop Entries")
iterable_class = DesktopSourceIterable
available_on = {"linux"}
locations: DesktopLocations
def __init__(self) -> None:
super().__init__()
self.locations = DesktopLocations()

View File

@@ -0,0 +1,140 @@
# flatpak_source.py
#
# Copyright 2022-2023 kramo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# SPDX-License-Identifier: GPL-3.0-or-later
from pathlib import Path
from typing import NamedTuple
from gi.repository import GLib, Gtk
from cartridges import shared
from cartridges.game import Game
from cartridges.importer.location import Location, LocationSubPath
from cartridges.importer.source import ExecutableFormatSource, SourceIterable
class FlatpakSourceIterable(SourceIterable):
source: "FlatpakSource"
def __iter__(self):
"""Generator method producing games"""
icon_theme = Gtk.IconTheme.new()
icon_theme.add_search_path(str(self.source.locations.data["icons"]))
blacklist = (
{"hu.kramo.Cartridges", "hu.kramo.Cartridges.Devel"}
if shared.schema.get_boolean("flatpak-import-launchers")
else {
"hu.kramo.Cartridges",
"hu.kramo.Cartridges.Devel",
"com.valvesoftware.Steam",
"net.lutris.Lutris",
"com.heroicgameslauncher.hgl",
"com.usebottles.Bottles",
"io.itch.itch",
"org.libretro.RetroArch",
}
)
for entry in (self.source.locations.data["applications"]).iterdir():
if entry.suffix != ".desktop":
continue
keyfile = GLib.KeyFile.new()
try:
keyfile.load_from_file(str(entry), 0)
if "Game" not in keyfile.get_string_list("Desktop Entry", "Categories"):
continue
if (
flatpak_id := keyfile.get_string("Desktop Entry", "X-Flatpak")
) in blacklist or flatpak_id != entry.stem:
continue
name = keyfile.get_string("Desktop Entry", "Name")
except GLib.GError:
continue
values = {
"source": self.source.source_id,
"added": shared.import_time,
"name": name,
"game_id": self.source.game_id_format.format(game_id=flatpak_id),
"executable": self.source.make_executable(flatpak_id=flatpak_id),
}
game = Game(values)
additional_data = {}
try:
if (
icon_path := icon_theme.lookup_icon(
keyfile.get_string("Desktop Entry", "Icon"),
None,
512,
1,
shared.win.get_direction(),
0,
)
.get_file()
.get_path()
):
additional_data = {"local_icon_path": Path(icon_path)}
else:
pass
except GLib.GError:
pass
yield (game, additional_data)
class FlatpakLocations(NamedTuple):
data: Location
class FlatpakSource(ExecutableFormatSource):
"""Generic Flatpak source"""
source_id = "flatpak"
name = _("Flatpak")
iterable_class = FlatpakSourceIterable
executable_format = "flatpak run {flatpak_id}"
available_on = {"linux"}
locations: FlatpakLocations
def __init__(self) -> None:
super().__init__()
self.locations = FlatpakLocations(
Location(
schema_key="flatpak-location",
candidates=(
"/var/lib/flatpak/",
shared.data_dir / "flatpak",
),
paths={
"applications": LocationSubPath("exports/share/applications", True),
"icons": LocationSubPath("exports/share/icons", True),
},
invalid_subtitle=Location.DATA_INVALID_SUBTITLE,
)
)

View File

@@ -0,0 +1,387 @@
# heroic_source.py
#
# Copyright 2022-2023 kramo
# Copyright 2023 Geoffrey Coulaud
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# SPDX-License-Identifier: GPL-3.0-or-later
import json
import logging
from abc import abstractmethod
from functools import cached_property
from hashlib import sha256
from json import JSONDecodeError
from pathlib import Path
from typing import Iterable, NamedTuple, Optional, TypedDict
from cartridges import shared
from cartridges.game import Game
from cartridges.importer.location import Location, LocationSubPath
from cartridges.importer.source import (
SourceIterable,
SourceIterationResult,
URLExecutableSource,
)
def path_json_load(path: Path):
"""
Load JSON from the file at the given path
:raises OSError: if the file can't be opened
:raises JSONDecodeError: if the file isn't valid JSON
"""
with path.open("r", encoding="utf-8") as open_file:
return json.load(open_file)
class InvalidLibraryFileError(Exception):
pass
class InvalidInstalledFileError(Exception):
pass
class InvalidStoreFileError(Exception):
pass
class HeroicLibraryEntry(TypedDict):
app_name: str
installed: Optional[bool]
runner: str
title: str
developer: str
art_square: str
class SubSourceIterable(Iterable):
"""Class representing a Heroic sub-source"""
source: "HeroicSource"
source_iterable: "HeroicSourceIterable"
name: str
service: str
image_uri_params: str = ""
relative_library_path: Path
library_json_entries_key: str = "library"
def __init__(self, source, source_iterable) -> None:
self.source = source
self.source_iterable = source_iterable
@cached_property
def library_path(self) -> Path:
path = self.source.locations.config.root / self.relative_library_path
logging.debug("Using Heroic %s library.json path %s", self.name, path)
return path
def process_library_entry(self, entry: HeroicLibraryEntry) -> SourceIterationResult:
"""Build a Game from a Heroic library entry"""
app_name = entry["app_name"]
runner = entry["runner"]
# Build game
values = {
"source": f"{self.source.source_id}_{self.service}",
"added": shared.import_time,
"name": entry["title"],
"developer": entry.get("developer", None),
"game_id": self.source.game_id_format.format(
service=self.service, game_id=app_name
),
"executable": self.source.make_executable(runner=runner, app_name=app_name),
"hidden": self.source_iterable.is_hidden(app_name),
}
game = Game(values)
# Get the image path from the Heroic cache
# Filenames are derived from the URL that Heroic used to get the file
uri: str = entry["art_square"] + self.image_uri_params
digest = sha256(uri.encode()).hexdigest()
image_path = self.source.locations.config.root / "images-cache" / digest
additional_data = {"local_image_path": image_path}
return (game, additional_data)
def __iter__(self):
"""
Iterate through the games with a generator
:raises InvalidLibraryFileError: on initial call if the library file is bad
"""
try:
iterator = iter(
path_json_load(self.library_path)[self.library_json_entries_key]
)
except (OSError, JSONDecodeError, TypeError, KeyError) as error:
raise InvalidLibraryFileError(
f"Invalid {self.library_path.name}"
) from error
for entry in iterator:
try:
yield self.process_library_entry(entry)
except KeyError as error:
logging.warning(
"Skipped invalid %s game %s",
self.name,
entry.get("app_name", "UNKNOWN"),
exc_info=error,
)
continue
class StoreSubSourceIterable(SubSourceIterable):
"""
Class representing a "store" sub source.
Games can be installed or not, this class does the check accordingly.
"""
relative_installed_path: Path
installed_app_names: set[str]
@cached_property
def installed_path(self) -> Path:
path = self.source.locations.config.root / self.relative_installed_path
logging.debug("Using Heroic %s installed.json path %s", self.name, path)
return path
@abstractmethod
def get_installed_app_names(self) -> set[str]:
"""
Get the sub source's installed app names as a set.
:raises InvalidInstalledFileError: if the installed file data cannot be read
Whenever possible, `__cause__` is set with the original exception
"""
def is_installed(self, app_name: str) -> bool:
return app_name in self.installed_app_names
def process_library_entry(self, entry):
# Skip games that are not installed
app_name = entry["app_name"]
if not self.is_installed(app_name):
logging.warning(
"Skipped %s game %s (%s): not installed",
self.service,
entry["title"],
app_name,
)
return None
# Process entry as normal
return super().process_library_entry(entry)
def __iter__(self):
"""
Iterate through the installed games with a generator
:raises InvalidLibraryFileError: on initial call if the library file is bad
:raises InvalidInstalledFileError: on initial call if the installed file is bad
"""
self.installed_app_names = self.get_installed_app_names()
yield from super().__iter__()
class SideloadIterable(SubSourceIterable):
name = "sideload"
service = "sideload"
relative_library_path = Path("sideload_apps") / "library.json"
library_json_entries_key = "games"
class LegendaryIterable(StoreSubSourceIterable):
name = "legendary"
service = "epic"
image_uri_params = "?h=400&resize=1&w=300"
relative_library_path = Path("store_cache") / "legendary_library.json"
# relative_installed_path = (
# Path("legendary") / "legendaryConfig" / "legendary" / "installed.json"
# )
@cached_property
def installed_path(self) -> Path:
"""Get the right path depending on the Heroic version"""
# TODO after Heroic 2.9 has been out for a while
# We should use the commented out relative_installed_path
# and remove this property override.
heroic_config_path = self.source.locations.config.root
# Heroic >= 2.9
if (path := heroic_config_path / "legendaryConfig").is_dir():
logging.debug("Using Heroic >= 2.9 legendary file")
# Heroic <= 2.8
elif heroic_config_path.is_relative_to(shared.flatpak_dir):
# Heroic flatpak
path = shared.flatpak_dir / "com.heroicgameslauncher.hgl" / "config"
logging.debug("Using Heroic flatpak <= 2.8 legendary file")
else:
# Heroic native
logging.debug("Using Heroic native <= 2.8 legendary file")
path = shared.home / ".config"
path = path / "legendary" / "installed.json"
logging.debug("Using Heroic %s installed.json path %s", self.name, path)
return path
def get_installed_app_names(self):
try:
return set(path_json_load(self.installed_path).keys())
except (OSError, JSONDecodeError, AttributeError) as error:
raise InvalidInstalledFileError(
f"Invalid {self.installed_path.name}"
) from error
class GogIterable(StoreSubSourceIterable):
name = "gog"
service = "gog"
library_json_entries_key = "games"
relative_library_path = Path("store_cache") / "gog_library.json"
relative_installed_path = Path("gog_store") / "installed.json"
def get_installed_app_names(self):
try:
return {
app_name
for entry in path_json_load(self.installed_path)["installed"]
if (app_name := entry.get("appName")) is not None
}
except (OSError, JSONDecodeError, KeyError, AttributeError) as error:
raise InvalidInstalledFileError(
f"Invalid {self.installed_path.name}"
) from error
class NileIterable(StoreSubSourceIterable):
name = "nile"
service = "amazon"
relative_library_path = Path("store_cache") / "nile_library.json"
relative_installed_path = Path("nile_config") / "nile" / "installed.json"
def get_installed_app_names(self):
try:
installed_json = path_json_load(self.installed_path)
return {
app_name
for entry in installed_json
if (app_name := entry.get("id")) is not None
}
except (OSError, JSONDecodeError, AttributeError) as error:
raise InvalidInstalledFileError(
f"Invalid {self.installed_path.name}"
) from error
class HeroicSourceIterable(SourceIterable):
source: "HeroicSource"
hidden_app_names: set[str] = set()
def is_hidden(self, app_name: str) -> bool:
return app_name in self.hidden_app_names
def get_hidden_app_names(self) -> set[str]:
"""Get the hidden app names from store/config.json
:raises InvalidStoreFileError: if the store is invalid for some reason
"""
try:
store = path_json_load(self.source.locations.config["store_config.json"])
self.hidden_app_names = {
app_name
for game in store["games"]["hidden"]
if (app_name := game.get("appName")) is not None
}
except KeyError:
logging.warning('No ["games"]["hidden"] key in Heroic store file')
except (OSError, JSONDecodeError, TypeError) as error:
logging.error("Invalid Heroic store file", exc_info=error)
raise InvalidStoreFileError() from error
def __iter__(self):
"""Generator method producing games from all the Heroic sub-sources"""
self.get_hidden_app_names()
# Get games from the sub sources
for sub_source_class in (
SideloadIterable,
LegendaryIterable,
GogIterable,
NileIterable,
):
sub_source = sub_source_class(self.source, self)
if not shared.schema.get_boolean("heroic-import-" + sub_source.service):
logging.debug("Skipping Heroic %s: disabled", sub_source.service)
continue
try:
sub_source_iterable = iter(sub_source)
yield from sub_source_iterable
except (InvalidLibraryFileError, InvalidInstalledFileError) as error:
logging.error(
"Skipping bad Heroic sub-source %s",
sub_source.service,
exc_info=error,
)
continue
class HeroicLocations(NamedTuple):
config: Location
class HeroicSource(URLExecutableSource):
"""Generic Heroic Games Launcher source"""
source_id = "heroic"
name = _("Heroic")
iterable_class = HeroicSourceIterable
url_format = "heroic://launch/{runner}/{app_name}"
available_on = {"linux", "win32"}
locations: HeroicLocations
@property
def game_id_format(self) -> str:
"""The string format used to construct game IDs"""
return self.source_id + "_{service}_{game_id}"
def __init__(self) -> None:
super().__init__()
self.locations = HeroicLocations(
Location(
schema_key="heroic-location",
candidates=(
shared.config_dir / "heroic",
shared.home / ".config" / "heroic",
shared.flatpak_dir
/ "com.heroicgameslauncher.hgl"
/ "config"
/ "heroic",
shared.appdata_dir / "heroic",
),
paths={
"config.json": LocationSubPath("config.json"),
"store_config.json": LocationSubPath("store/config.json"),
},
invalid_subtitle=Location.CONFIG_INVALID_SUBTITLE,
)
)

View File

@@ -0,0 +1,424 @@
# importer.py
#
# Copyright 2022-2023 kramo
# Copyright 2023 Geoffrey Coulaud
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# SPDX-License-Identifier: GPL-3.0-or-later
import logging
from time import time
from typing import Any, Optional
from gi.repository import Adw, Gio, GLib, Gtk
from cartridges import shared
from cartridges.errors.error_producer import ErrorProducer
from cartridges.errors.friendly_error import FriendlyError
from cartridges.game import Game
from cartridges.importer.location import UnresolvableLocationError
from cartridges.importer.source import Source
from cartridges.store.managers.async_manager import AsyncManager
from cartridges.store.pipeline import Pipeline
# pylint: disable=too-many-instance-attributes
class Importer(ErrorProducer):
"""A class in charge of scanning sources for games"""
progressbar: Gtk.ProgressBar
import_statuspage: Adw.StatusPage
import_dialog: Adw.MessageDialog
summary_toast: Optional[Adw.Toast] = None
sources: set[Source]
n_source_tasks_created: int = 0
n_source_tasks_done: int = 0
n_pipelines_done: int = 0
game_pipelines: set[Pipeline]
removed_game_ids: set[str]
imported_game_ids: set[str]
close_req_id: int
def __init__(self) -> None:
super().__init__()
shared.import_time = int(time())
# TODO: make this stateful
shared.store.new_game_ids = set()
shared.store.duplicate_game_ids = set()
self.removed_game_ids = set()
self.imported_game_ids = set()
self.game_pipelines = set()
self.sources = set()
@property
def n_games_added(self) -> int:
return sum(
1 if not (pipeline.game.blacklisted or pipeline.game.removed) else 0
for pipeline in self.game_pipelines
)
@property
def pipelines_progress(self) -> float:
progress = sum(pipeline.progress for pipeline in self.game_pipelines)
try:
progress = progress / len(self.game_pipelines)
except ZeroDivisionError:
progress = 0
return progress # type: ignore
@property
def sources_progress(self) -> float:
try:
progress = self.n_source_tasks_done / self.n_source_tasks_created
except ZeroDivisionError:
progress = 0
return progress
@property
def finished(self) -> bool:
return (
self.n_source_tasks_created == self.n_source_tasks_done
and len(self.game_pipelines) == self.n_pipelines_done
)
def add_source(self, source: Source) -> None:
self.sources.add(source)
def run(self) -> None:
"""Use several Gio.Task to import games from added sources"""
shared.win.get_application().state = shared.AppState.IMPORT
if self.__class__.summary_toast:
self.__class__.summary_toast.dismiss()
shared.win.get_application().lookup_action("import").set_enabled(False)
shared.win.get_application().lookup_action("add_game").set_enabled(False)
self.create_dialog()
# Collect all errors and reset the cancellables for the managers
# - Only one importer exists at any given time
# - Every import starts fresh
self.collect_errors()
for manager in shared.store.managers.values():
manager.collect_errors()
if isinstance(manager, AsyncManager):
manager.reset_cancellable()
for source in self.sources:
logging.debug("Importing games from source %s", source.source_id)
task = Gio.Task.new(None, None, self.source_callback, (source,))
self.n_source_tasks_created += 1
task.run_in_thread(
lambda _task, _obj, _data, _cancellable, src=source: self.source_task_thread_func(
(src,)
)
)
self.progress_changed_callback()
def create_dialog(self) -> None:
"""Create the import dialog"""
self.progressbar = Gtk.ProgressBar(margin_start=12, margin_end=12)
self.import_statuspage = Adw.StatusPage(
title=_("Importing Games…"),
child=self.progressbar,
)
self.import_dialog = Adw.Window(
content=self.import_statuspage,
modal=True,
default_width=350,
default_height=-1,
transient_for=shared.win,
deletable=False,
)
self.close_req_id = self.import_dialog.connect(
"close-request", lambda *_: shared.win.close()
)
self.import_dialog.present()
def source_task_thread_func(self, data: tuple) -> None:
"""Source import task code"""
source: Source
source, *_rest = data
# Early exit if not available or not installed
if not source.is_available:
logging.info("Source %s skipped, not available", source.source_id)
return
try:
iterator = iter(source)
except UnresolvableLocationError:
logging.info("Source %s skipped, bad location", source.source_id)
return
# Get games from source
logging.info("Scanning source %s", source.source_id)
while True:
# Handle exceptions raised when iterating
try:
iteration_result = next(iterator)
except StopIteration:
break
except Exception as error: # pylint: disable=broad-exception-caught
logging.exception("%s in %s", type(error).__name__, source.source_id)
self.report_error(error)
continue
# Handle the result depending on its type
if isinstance(iteration_result, Game):
game = iteration_result
additional_data = {}
elif isinstance(iteration_result, tuple):
game, additional_data = iteration_result
elif iteration_result is None:
continue
else:
# Warn source implementers that an invalid type was produced
# Should not happen on production code
logging.warning(
"%s produced an invalid iteration return type %s",
source.source_id,
type(iteration_result),
)
continue
# Register game
pipeline: Pipeline = shared.store.add_game(game, additional_data)
if pipeline is not None:
logging.info("Imported %s (%s)", game.name, game.game_id)
pipeline.connect("advanced", self.pipeline_advanced_callback)
self.game_pipelines.add(pipeline)
def update_progressbar(self) -> None:
"""Update the progressbar to show the overall import progress"""
# Reserve 10% for the sources discovery, the rest is the pipelines
self.progressbar.set_fraction(
(0.1 * self.sources_progress) + (0.9 * self.pipelines_progress)
)
def source_callback(self, _obj: Any, _result: Any, data: tuple) -> None:
"""Callback executed when a source is fully scanned"""
source, *_rest = data
logging.debug("Import done for source %s", source.source_id)
self.n_source_tasks_done += 1
self.progress_changed_callback()
def pipeline_advanced_callback(self, pipeline: Pipeline) -> None:
"""Callback called when a pipeline for a game has advanced"""
if pipeline.is_done:
self.n_pipelines_done += 1
self.progress_changed_callback()
def progress_changed_callback(self) -> None:
"""
Callback called when the import process has progressed
Triggered when:
* All sources have been started
* A source finishes
* A pipeline finishes
"""
self.update_progressbar()
if self.finished:
self.import_callback()
def remove_games(self) -> None:
"""Set removed to True for missing games"""
if not shared.schema.get_boolean("remove-missing"):
return
for game in shared.store:
if game.removed:
continue
if game.source == "imported":
continue
if not shared.schema.get_boolean(game.base_source):
continue
if game.game_id in shared.store.duplicate_game_ids:
continue
if game.game_id in shared.store.new_game_ids:
continue
logging.debug("Removing missing game %s (%s)", game.name, game.game_id)
game.removed = True
game.save()
game.update()
self.removed_game_ids.add(game.game_id)
def import_callback(self) -> None:
"""Callback called when importing has finished"""
logging.info("Import done")
self.remove_games()
self.imported_game_ids = shared.store.new_game_ids
shared.store.new_game_ids = set()
shared.store.duplicate_game_ids = set()
# Disconnect the close-request signal that closes the main window
self.import_dialog.disconnect(self.close_req_id)
self.import_dialog.close()
self.__class__.summary_toast = self.create_summary_toast()
self.create_error_dialog()
shared.win.get_application().lookup_action("import").set_enabled(True)
shared.win.get_application().lookup_action("add_game").set_enabled(True)
shared.win.get_application().state = shared.AppState.DEFAULT
shared.win.create_source_rows()
def create_error_dialog(self) -> None:
"""Dialog containing all errors raised by importers"""
# Collect all errors that happened in the importer and the managers
errors = []
errors.extend(self.collect_errors())
for manager in shared.store.managers.values():
errors.extend(manager.collect_errors())
# Filter out non friendly errors
errors = set(
tuple(
(error.title, error.subtitle)
for error in (
filter(lambda error: isinstance(error, FriendlyError), errors)
)
)
)
# No error to display
if not errors:
self.timeout_toast()
return
# Create error dialog
dialog = Adw.MessageDialog()
dialog.set_heading(_("Warning"))
dialog.add_response("close", _("Dismiss"))
dialog.add_response("open_preferences_import", _("Preferences"))
dialog.set_default_response("open_preferences_import")
dialog.connect("response", self.dialog_response_callback)
dialog.set_transient_for(shared.win)
if len(errors) == 1:
dialog.set_heading((error := next(iter(errors)))[0])
dialog.set_body(error[1])
else:
# Display the errors in a list
list_box = Gtk.ListBox()
list_box.set_selection_mode(Gtk.SelectionMode.NONE)
list_box.set_css_classes(["boxed-list"])
list_box.set_margin_top(9)
for error in errors:
row = Adw.ActionRow.new()
row.set_title(error[0])
row.set_subtitle(error[1])
list_box.append(row)
dialog.set_body(_("The following errors occured during import:"))
dialog.set_extra_child(list_box)
dialog.present()
def undo_import(self, *_args: Any) -> None:
for game_id in self.imported_game_ids:
shared.store[game_id].removed = True
shared.store[game_id].update()
shared.store[game_id].save()
for game_id in self.removed_game_ids:
shared.store[game_id].removed = False
shared.store[game_id].update()
shared.store[game_id].save()
self.imported_game_ids = set()
self.removed_game_ids = set()
if self.__class__.summary_toast:
self.__class__.summary_toast.dismiss()
logging.info("Import undone")
def create_summary_toast(self) -> Adw.Toast:
"""N games imported, removed toast"""
toast = Adw.Toast(timeout=0, priority=Adw.ToastPriority.HIGH)
if not self.n_games_added:
toast_title = _("No new games found")
if not self.removed_game_ids:
toast.set_button_label(_("Preferences"))
toast.connect(
"button-clicked",
self.dialog_response_callback,
"open_preferences",
"import",
)
elif self.n_games_added == 1:
toast_title = _("1 game imported")
elif self.n_games_added > 1:
# The variable is the number of games
toast_title = _("{} games imported").format(self.n_games_added)
if (removed_length := len(self.removed_game_ids)) == 1:
# A single game removed
toast_title += ", " + _("1 removed")
elif removed_length > 1:
# The variable is the number of games removed
toast_title += ", " + _("{} removed").format(removed_length)
if self.n_games_added or self.removed_game_ids:
toast.set_button_label(_("Undo"))
toast.connect("button-clicked", self.undo_import)
toast.set_title(toast_title)
shared.win.toast_overlay.add_toast(toast)
return toast
def open_preferences(
self,
page_name: Optional[str] = None,
expander_row: Optional[Adw.ExpanderRow] = None,
) -> Adw.PreferencesWindow:
return shared.win.get_application().on_preferences_action(
page_name=page_name, expander_row=expander_row
)
def timeout_toast(self, *_args: Any) -> None:
"""Manually timeout the toast after the user has dismissed all warnings"""
if self.__class__.summary_toast:
GLib.timeout_add_seconds(5, self.__class__.summary_toast.dismiss)
def dialog_response_callback(self, _widget: Any, response: str, *args: Any) -> None:
"""Handle after-import dialogs callback"""
logging.debug("After-import dialog response: %s (%s)", response, str(args))
if response == "open_preferences":
self.open_preferences(*args)
elif response == "open_preferences_import":
self.open_preferences(*args).connect("close-request", self.timeout_toast)
else:
self.timeout_toast()

View File

@@ -0,0 +1,104 @@
# itch_source.py
#
# Copyright 2022-2023 kramo
# Copyright 2023 Geoffrey Coulaud
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# SPDX-License-Identifier: GPL-3.0-or-later
from shutil import rmtree
from sqlite3 import connect
from typing import NamedTuple
from cartridges import shared
from cartridges.game import Game
from cartridges.importer.location import Location, LocationSubPath
from cartridges.importer.source import SourceIterable, URLExecutableSource
from cartridges.utils.sqlite import copy_db
class ItchSourceIterable(SourceIterable):
source: "ItchSource"
def __iter__(self):
"""Generator method producing games"""
# Query the database
db_request = """
SELECT
games.id,
games.title,
games.cover_url,
games.still_cover_url,
caves.id
FROM
'caves'
INNER JOIN
'games'
ON
caves.game_id = games.id
;
"""
db_path = copy_db(self.source.locations.config["butler.db"])
connection = connect(db_path)
cursor = connection.execute(db_request)
# Create games from the db results
for row in cursor:
values = {
"added": shared.import_time,
"source": self.source.source_id,
"name": row[1],
"game_id": self.source.game_id_format.format(game_id=row[0]),
"executable": self.source.make_executable(cave_id=row[4]),
}
additional_data = {"online_cover_url": row[3] or row[2]}
game = Game(values)
yield (game, additional_data)
# Cleanup
rmtree(str(db_path.parent))
class ItchLocations(NamedTuple):
config: Location
class ItchSource(URLExecutableSource):
source_id = "itch"
name = _("itch")
iterable_class = ItchSourceIterable
url_format = "itch://caves/{cave_id}/launch"
available_on = {"linux", "win32"}
locations: ItchLocations
def __init__(self) -> None:
super().__init__()
self.locations = ItchLocations(
Location(
schema_key="itch-location",
candidates=(
shared.flatpak_dir / "io.itch.itch" / "config" / "itch",
shared.config_dir / "itch",
shared.home / ".config" / "itch",
shared.appdata_dir / "itch",
),
paths={
"butler.db": LocationSubPath("db/butler.db"),
},
invalid_subtitle=Location.CONFIG_INVALID_SUBTITLE,
)
)

View File

@@ -0,0 +1,119 @@
# legendary_source.py
#
# Copyright 2023 Geoffrey Coulaud
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# SPDX-License-Identifier: GPL-3.0-or-later
import json
import logging
from json import JSONDecodeError
from typing import NamedTuple
from cartridges import shared
from cartridges.game import Game
from cartridges.importer.location import Location, LocationSubPath
from cartridges.importer.source import (
ExecutableFormatSource,
SourceIterable,
SourceIterationResult,
)
class LegendarySourceIterable(SourceIterable):
source: "LegendarySource"
def game_from_library_entry(self, entry: dict) -> SourceIterationResult:
# Skip non-games
if entry["is_dlc"]:
return None
# Build game
app_name = entry["app_name"]
values = {
"added": shared.import_time,
"source": self.source.source_id,
"name": entry["title"],
"game_id": self.source.game_id_format.format(game_id=app_name),
"executable": self.source.make_executable(app_name=app_name),
}
data = {}
# Get additional metadata from file (optional)
metadata_file = self.source.locations.config["metadata"] / f"{app_name}.json"
try:
metadata = json.load(metadata_file.open())
values["developer"] = metadata["metadata"]["developer"]
for image_entry in metadata["metadata"]["keyImages"]:
if image_entry["type"] == "DieselGameBoxTall":
data["online_cover_url"] = image_entry["url"]
break
except (JSONDecodeError, OSError, KeyError):
pass
game = Game(values)
return (game, data)
def __iter__(self):
# Open library
file = self.source.locations.config["installed.json"]
try:
library: dict = json.load(file.open())
except (JSONDecodeError, OSError):
logging.warning("Couldn't open Legendary file: %s", str(file))
return
# Generate games from library
for entry in library.values():
try:
result = self.game_from_library_entry(entry)
except KeyError as error:
# Skip invalid games
logging.warning(
"Invalid Legendary game skipped in %s", str(file), exc_info=error
)
continue
yield result
class LegendaryLocations(NamedTuple):
config: Location
class LegendarySource(ExecutableFormatSource):
source_id = "legendary"
name = _("Legendary")
executable_format = "legendary launch {app_name}"
available_on = {"linux"}
iterable_class = LegendarySourceIterable
locations: LegendaryLocations
def __init__(self) -> None:
super().__init__()
self.locations = LegendaryLocations(
Location(
schema_key="legendary-location",
candidates=(
shared.config_dir / "legendary",
shared.home / ".config" / "legendary",
),
paths={
"installed.json": LocationSubPath("installed.json"),
"metadata": LocationSubPath("metadata", True),
},
invalid_subtitle=Location.CONFIG_INVALID_SUBTITLE,
)
)

View File

@@ -0,0 +1,102 @@
import logging
from os import PathLike
from pathlib import Path
from typing import Iterable, Mapping, NamedTuple, Optional
from cartridges import shared
PathSegment = str | PathLike | Path
PathSegments = Iterable[PathSegment]
Candidate = PathSegments
class LocationSubPath(NamedTuple):
segment: PathSegment
is_directory: bool = False
class UnresolvableLocationError(Exception):
pass
class Location:
"""
Class representing a filesystem location
* A location may have multiple candidate roots
* The path in the schema is always favored
* From the candidate root, multiple subpaths should exist for it to be valid
* When resolved, the schema is updated with the picked chosen
"""
# The variable is the name of the source
CACHE_INVALID_SUBTITLE = _("Select the {} cache directory.")
# The variable is the name of the source
CONFIG_INVALID_SUBTITLE = _("Select the {} configuration directory.")
# The variable is the name of the source
DATA_INVALID_SUBTITLE = _("Select the {} data directory.")
schema_key: str
candidates: Iterable[Candidate]
paths: Mapping[str, LocationSubPath]
invalid_subtitle: str
root: Optional[Path] = None
def __init__(
self,
schema_key: str,
candidates: Iterable[Candidate],
paths: Mapping[str, LocationSubPath],
invalid_subtitle: str,
) -> None:
super().__init__()
self.schema_key = schema_key
self.candidates = candidates
self.paths = paths
self.invalid_subtitle = invalid_subtitle
def check_candidate(self, candidate: Path) -> bool:
"""Check if a candidate root has the necessary files and directories"""
for segment, is_directory in self.paths.values():
path = Path(candidate) / segment
if is_directory:
if not path.is_dir():
return False
else:
if not path.is_file():
return False
return True
def resolve(self) -> None:
"""Choose a root path from the candidates for the location.
If none fits, raise an UnresolvableLocationError"""
if self.root is not None:
return
# Get the schema candidate
schema_candidate = shared.schema.get_string(self.schema_key)
# Find the first matching candidate
for candidate in (schema_candidate, *self.candidates):
candidate = Path(candidate).expanduser()
if not self.check_candidate(candidate):
continue
self.root = candidate
break
else:
# No good candidate found
raise UnresolvableLocationError()
# Update the schema with the found candidate
value = str(candidate)
shared.schema.set_string(self.schema_key, value)
logging.debug("Resolved value for schema key %s: %s", self.schema_key, value)
def __getitem__(self, key: str) -> Optional[Path]:
"""Get the computed path from its key for the location"""
self.resolve()
if self.root:
return self.root / self.paths[key].segment
return None

View File

@@ -0,0 +1,132 @@
# lutris_source.py
#
# Copyright 2022-2023 kramo
# Copyright 2023 Geoffrey Coulaud
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# SPDX-License-Identifier: GPL-3.0-or-later
from shutil import rmtree
from sqlite3 import connect
from typing import NamedTuple
from cartridges import shared
from cartridges.game import Game
from cartridges.importer.location import Location, LocationSubPath
from cartridges.importer.source import SourceIterable, URLExecutableSource
from cartridges.utils.sqlite import copy_db
class LutrisSourceIterable(SourceIterable):
source: "LutrisSource"
def __iter__(self):
"""Generator method producing games"""
# Query the database
request = """
SELECT id, name, slug, runner, hidden
FROM 'games'
WHERE
name IS NOT NULL
AND slug IS NOT NULL
AND configPath IS NOT NULL
AND installed
AND (runner IS NOT "steam" OR :import_steam)
AND (runner IS NOT "flatpak" OR :import_flatpak)
;
"""
params = {
"import_steam": shared.schema.get_boolean("lutris-import-steam"),
"import_flatpak": shared.schema.get_boolean("lutris-import-flatpak"),
}
db_path = copy_db(self.source.locations.config["pga.db"])
connection = connect(db_path)
cursor = connection.execute(request, params)
# Create games from the DB results
for row in cursor:
# Create game
values = {
"added": shared.import_time,
"hidden": row[4],
"name": row[1],
"source": f"{self.source.source_id}_{row[3]}",
"game_id": self.source.game_id_format.format(
runner=row[3], game_id=row[0]
),
"executable": self.source.make_executable(game_id=row[0]),
}
game = Game(values)
# Get official image path
image_path = self.source.locations.cache["coverart"] / f"{row[2]}.jpg"
additional_data = {"local_image_path": image_path}
yield (game, additional_data)
# Cleanup
rmtree(str(db_path.parent))
class LutrisLocations(NamedTuple):
config: Location
cache: Location
class LutrisSource(URLExecutableSource):
"""Generic Lutris source"""
source_id = "lutris"
name = _("Lutris")
iterable_class = LutrisSourceIterable
url_format = "lutris:rungameid/{game_id}"
available_on = {"linux"}
# FIXME possible bug: config picks ~/.var... and cache picks ~/.local...
locations: LutrisLocations
@property
def game_id_format(self):
return self.source_id + "_{runner}_{game_id}"
def __init__(self) -> None:
super().__init__()
self.locations = LutrisLocations(
Location(
schema_key="lutris-location",
candidates=(
shared.flatpak_dir / "net.lutris.Lutris" / "data" / "lutris",
shared.data_dir / "lutris",
shared.home / ".local" / "share" / "lutris",
),
paths={
"pga.db": LocationSubPath("pga.db"),
},
invalid_subtitle=Location.DATA_INVALID_SUBTITLE,
),
Location(
schema_key="lutris-cache-location",
candidates=(
shared.flatpak_dir / "net.lutris.Lutris" / "cache" / "lutris",
shared.cache_dir / "lutris",
shared.home / ".cache" / "lutris",
),
paths={
"coverart": LocationSubPath("coverart", True),
},
invalid_subtitle=Location.CACHE_INVALID_SUBTITLE,
),
)

View File

@@ -0,0 +1,255 @@
# retroarch_source.py
#
# Copyright 2023 Rilic
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# SPDX-License-Identifier: GPL-3.0-or-later
import json
import logging
import re
from hashlib import md5
from json import JSONDecodeError
from pathlib import Path
from shlex import quote as shell_quote
from typing import NamedTuple
from cartridges import shared
from cartridges.errors.friendly_error import FriendlyError
from cartridges.game import Game
from cartridges.importer.location import (
Location,
LocationSubPath,
UnresolvableLocationError,
)
from cartridges.importer.source import Source, SourceIterable
from cartridges.importer.steam_source import SteamSource
class RetroarchSourceIterable(SourceIterable):
source: "RetroarchSource"
def get_config_value(self, key: str, config_data: str):
for item in re.findall(f'{key}\\s*=\\s*"(.*)"\n', config_data, re.IGNORECASE):
if item.startswith(":"):
item = item.replace(":", str(self.source.locations.config.root))
logging.debug(str(item))
return item
raise KeyError(f"Key not found in RetroArch config: {key}")
def __iter__(self):
bad_playlists = set()
config_file = self.source.locations.config["retroarch.cfg"]
with config_file.open(encoding="utf-8") as open_file:
config_data = open_file.read()
playlist_folder = Path(
self.get_config_value("playlist_directory", config_data)
).expanduser()
thumbnail_folder = Path(
self.get_config_value("thumbnails_directory", config_data)
).expanduser()
# Get all playlist files, ending in .lpl
playlist_files = playlist_folder.glob("*.lpl")
for playlist_file in playlist_files:
logging.debug(playlist_file)
try:
with playlist_file.open(
encoding="utf-8",
) as open_file:
playlist_json = json.load(open_file)
except (JSONDecodeError, OSError):
logging.warning("Cannot read playlist file: %s", str(playlist_file))
continue
for item in playlist_json["items"]:
# Select the core.
# Try the content's core first, then the playlist's default core.
# If none can be used, warn the user and continue.
for core_path in (
item["core_path"],
playlist_json["default_core_path"],
):
if core_path not in ("DETECT", ""):
break
else:
logging.warning("Cannot find core for: %s", str(item["path"]))
bad_playlists.add(playlist_file.stem)
continue
# Build game
game_id = md5(item["path"].encode("utf-8")).hexdigest()
values = {
"source": self.source.source_id,
"added": shared.import_time,
"name": item["label"],
"game_id": self.source.game_id_format.format(game_id=game_id),
"executable": self.source.make_executable(
core_path=core_path,
rom_path=item["path"],
),
}
game = Game(values)
# Get boxart
boxart_image_name = item["label"] + ".png"
boxart_image_name = re.sub(r"[&\*\/:`<>\?\\\|]", "_", boxart_image_name)
boxart_folder_name = playlist_file.stem
image_path = (
thumbnail_folder
/ boxart_folder_name
/ "Named_Boxarts"
/ boxart_image_name
)
additional_data = {"local_image_path": image_path}
yield (game, additional_data)
if bad_playlists:
raise FriendlyError(
_("No RetroArch Core Selected"),
# The variable is a newline separated list of playlists
_("The following playlists have no default core:")
+ "\n\n{}\n\n".format("\n".join(bad_playlists))
+ _("Games with no core selected were not imported"),
)
class RetroarchLocations(NamedTuple):
config: Location
class RetroarchSource(Source):
name = _("RetroArch")
source_id = "retroarch"
available_on = {"linux"}
iterable_class = RetroarchSourceIterable
locations: RetroarchLocations
def __init__(self) -> None:
super().__init__()
self.locations = RetroarchLocations(
Location(
schema_key="retroarch-location",
candidates=[
shared.flatpak_dir
/ "org.libretro.RetroArch"
/ "config"
/ "retroarch",
shared.config_dir / "retroarch",
shared.home / ".config" / "retroarch",
# TODO: Windows support, waiting for executable path setting improvement
# Path("C:\\RetroArch-Win64"),
# Path("C:\\RetroArch-Win32"),
# TODO: UWP support (URL handler - https://github.com/libretro/RetroArch/pull/13563)
# shared.local_appdata_dir
# / "Packages"
# / "1e4cf179-f3c2-404f-b9f3-cb2070a5aad8_8ngdn9a6dx1ma"
# / "LocalState",
],
paths={
"retroarch.cfg": LocationSubPath("retroarch.cfg"),
},
invalid_subtitle=Location.CONFIG_INVALID_SUBTITLE,
)
)
# TODO enable when we get the Steam RetroArch games working
# self.add_steam_location_candidate()
def add_steam_location_candidate(self) -> None:
"""Add the Steam RetroAcrh location to the config candidates"""
try:
self.locations.config.candidates.append(self.get_steam_location())
except (OSError, KeyError, UnresolvableLocationError):
logging.debug("Steam isn't installed")
except ValueError as error:
logging.debug("RetroArch Steam location candiate not found", exc_info=error)
def get_steam_location(self) -> str:
"""
Get the RetroArch installed via Steam location
:raise UnresolvableLocationError: if Steam isn't installed
:raise KeyError: if there is no libraryfolders.vdf subpath
:raise OSError: if libraryfolders.vdf can't be opened
:raise ValueError: if RetroArch isn't installed through Steam
"""
# Find Steam location
libraryfolders = SteamSource().locations.data["libraryfolders.vdf"]
parse_apps = False
with open(libraryfolders, "r", encoding="utf-8") as open_file:
# Search each line for a library path and store it each time a new one is found.
for line in open_file:
if '"path"' in line:
library_path = re.findall(
'"path"\\s+"(.*)"\n', line, re.IGNORECASE
)[0]
elif '"apps"' in line:
parse_apps = True
elif parse_apps and "}" in line:
parse_apps = False
# Stop searching, as the library path directly above the appid has been found.
elif parse_apps and '"1118310"' in line:
return Path(f"{library_path}/steamapps/common/RetroArch")
# Not found
raise ValueError("RetroArch not found in Steam library")
def make_executable(self, core_path: Path, rom_path: Path) -> str:
"""
Generate an executable command from the rom path and core path,
depending on the source's location.
The format depends on RetroArch's installation method,
detected from the source config location
:param Path rom_path: the game's rom path
:param Path core_path: the game's core path
:return str: an executable command
"""
self.locations.config.resolve()
args = ("-L", core_path, rom_path)
# Steam RetroArch
# (Must check before Flatpak, because Steam itself can be installed as one)
# TODO enable when we get Steam RetroArch executable to work
# if self.locations.config.root.parent.parent.name == "steamapps":
# # steam://run exepects args to be url-encoded and separated by spaces.
# args = map(lambda s: url_quote(str(s), safe=""), args)
# args_str = " ".join(args)
# uri = f"steam://run/1118310//{args_str}/"
# return f"xdg-open {shell_quote(uri)}"
# Flatpak RetroArch
args = map(lambda s: shell_quote(str(s)), args)
args_str = " ".join(args)
if self.locations.config.root.is_relative_to(shared.flatpak_dir):
return f"flatpak run org.libretro.RetroArch {args_str}"
# TODO executable override for non-sandboxed sources
# Linux native RetroArch
return f"retroarch {args_str}"
# TODO implement for windows (needs override)

View File

@@ -0,0 +1,126 @@
# source.py
#
# Copyright 2023 Geoffrey Coulaud
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# SPDX-License-Identifier: GPL-3.0-or-later
import sys
from abc import abstractmethod
from collections.abc import Iterable
from typing import Any, Collection, Generator, Optional
from cartridges.game import Game
from cartridges.importer.location import Location
# Type of the data returned by iterating on a Source
SourceIterationResult = Optional[Game | tuple[Game, tuple[Any]]]
class SourceIterable(Iterable):
"""Data producer for a source of games"""
source: "Source"
def __init__(self, source: "Source") -> None:
self.source = source
@abstractmethod
def __iter__(self) -> Generator[SourceIterationResult, None, None]:
"""
Method that returns a generator that produces games
* Should be implemented as a generator method
* May yield `None` when an iteration hasn't produced a game
* In charge of handling per-game errors
* Returns when exhausted
"""
class Source(Iterable):
"""Source of games. E.g an installed app with a config file that lists game directories"""
source_id: str
name: str
variant: Optional[str] = None
available_on: set[str] = set()
iterable_class: type[SourceIterable]
# NOTE: Locations must be set at __init__ time, not in the class definition.
# They must not be shared between source instances.
locations: Collection[Location]
@property
def full_name(self) -> str:
"""The source's full name"""
full_name_ = self.name
if self.variant:
full_name_ += f" ({self.variant})"
return full_name_
@property
def game_id_format(self) -> str:
"""The string format used to construct game IDs"""
return self.source_id + "_{game_id}"
@property
def is_available(self) -> bool:
return sys.platform in self.available_on
def make_executable(self, *args, **kwargs) -> str:
"""
Create a game executable command.
Should be implemented by child classes.
"""
def __iter__(self) -> Generator[SourceIterationResult, None, None]:
"""
Get an iterator for the source
:raises UnresolvableLocationError: Not iterable if any of the locations are unresolvable
"""
for location in self.locations:
location.resolve()
return iter(self.iterable_class(self))
class ExecutableFormatSource(Source):
"""Source class that uses a simple executable format to start games"""
@property
@abstractmethod
def executable_format(self) -> str:
"""The executable format used to construct game executables"""
def make_executable(self, *args, **kwargs) -> str:
"""Use the executable format to"""
return self.executable_format.format(*args, **kwargs)
# pylint: disable=abstract-method
class URLExecutableSource(ExecutableFormatSource):
"""Source class that use custom URLs to start games"""
url_format: str
@property
def executable_format(self) -> str:
match sys.platform:
case "win32":
return "start " + self.url_format
case "linux":
return "xdg-open " + self.url_format
case other:
raise NotImplementedError(
f"No URL handler command available for {other}"
)

View File

@@ -0,0 +1,140 @@
# steam_source.py
#
# Copyright 2022-2023 kramo
# Copyright 2023 Geoffrey Coulaud
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# SPDX-License-Identifier: GPL-3.0-or-later
import logging
import re
from pathlib import Path
from typing import Iterable, NamedTuple
from cartridges import shared
from cartridges.game import Game
from cartridges.importer.location import Location, LocationSubPath
from cartridges.importer.source import SourceIterable, URLExecutableSource
from cartridges.utils.steam import SteamFileHelper, SteamInvalidManifestError
class SteamSourceIterable(SourceIterable):
source: "SteamSource"
def get_manifest_dirs(self) -> Iterable[Path]:
"""Get dirs that contain Steam app manifests"""
libraryfolders_path = self.source.locations.data["libraryfolders.vdf"]
with open(libraryfolders_path, "r", encoding="utf-8") as file:
contents = file.read()
return [
Path(path) / "steamapps"
for path in re.findall('"path"\\s+"(.*)"\n', contents, re.IGNORECASE)
]
def get_manifests(self) -> Iterable[Path]:
"""Get app manifests"""
manifests = set()
for steamapps_dir in self.get_manifest_dirs():
if not steamapps_dir.is_dir():
continue
manifests.update(
[
manifest
for manifest in steamapps_dir.glob("appmanifest_*.acf")
if manifest.is_file()
]
)
return manifests
def __iter__(self):
"""Generator method producing games"""
appid_cache = set()
manifests = self.get_manifests()
for manifest in manifests:
# Get metadata from manifest
steam = SteamFileHelper()
try:
local_data = steam.get_manifest_data(manifest)
except (OSError, SteamInvalidManifestError) as error:
logging.debug("Couldn't load appmanifest %s", manifest, exc_info=error)
continue
# Skip non installed games
installed_mask = 4
if not int(local_data["stateflags"]) & installed_mask:
logging.debug("Skipped %s: not installed", manifest)
continue
# Skip duplicate appids
appid = local_data["appid"]
if appid in appid_cache:
logging.debug("Skipped %s: appid already seen during import", manifest)
continue
appid_cache.add(appid)
# Build game from local data
values = {
"added": shared.import_time,
"name": local_data["name"],
"source": self.source.source_id,
"game_id": self.source.game_id_format.format(game_id=appid),
"executable": self.source.make_executable(game_id=appid),
}
game = Game(values)
# Add official cover image
image_path = (
self.source.locations.data["librarycache"]
/ f"{appid}_library_600x900.jpg"
)
additional_data = {"local_image_path": image_path, "steam_appid": appid}
yield (game, additional_data)
class SteamLocations(NamedTuple):
data: Location
class SteamSource(URLExecutableSource):
source_id = "steam"
name = _("Steam")
available_on = {"linux", "win32"}
iterable_class = SteamSourceIterable
url_format = "steam://rungameid/{game_id}"
locations: SteamLocations
def __init__(self) -> None:
super().__init__()
self.locations = SteamLocations(
Location(
schema_key="steam-location",
candidates=(
shared.home / ".steam" / "steam",
shared.data_dir / "Steam",
shared.flatpak_dir / "com.valvesoftware.Steam" / "data" / "Steam",
shared.programfiles32_dir / "Steam",
),
paths={
"libraryfolders.vdf": LocationSubPath(
"steamapps/libraryfolders.vdf"
),
"librarycache": LocationSubPath("appcache/librarycache", True),
},
invalid_subtitle=Location.DATA_INVALID_SUBTITLE,
)
)