Merge branch 'main' into libadwaita-1.4

This commit is contained in:
kramo
2023-08-13 13:02:56 +02:00
59 changed files with 2604 additions and 1373 deletions

View File

@@ -64,7 +64,7 @@ class DetailsWindow(Adw.Window):
self.set_transient_for(self.win)
if self.game:
self.set_title(_("Edit Game Details"))
self.set_title(_("Game Details"))
self.name.set_text(self.game.name)
if self.game.developer:
self.developer.set_text(self.game.developer)
@@ -76,7 +76,7 @@ class DetailsWindow(Adw.Window):
self.cover_button_delete_revealer.set_reveal_child(True)
else:
self.set_title(_("Add New Game"))
self.apply_button.set_label(_("Confirm"))
self.apply_button.set_label(_("Add"))
image_filter = Gtk.FileFilter(name=_("Images"))
for extension in Image.registered_extensions():
@@ -123,9 +123,9 @@ class DetailsWindow(Adw.Window):
self.cover_button_edit.connect("clicked", self.choose_cover)
self.apply_button.connect("clicked", self.apply_preferences)
self.name.connect("activate", self.focus_executable)
self.developer.connect("activate", self.focus_executable)
self.executable.connect("activate", self.apply_preferences)
self.name.connect("entry-activated", self.focus_executable)
self.developer.connect("entry-activated", self.focus_executable)
self.executable.connect("entry-activated", self.apply_preferences)
self.set_focus(self.name)
self.present()

View File

@@ -106,7 +106,7 @@ class Importer(ErrorProducer):
manager.reset_cancellable()
for source in self.sources:
logging.debug("Importing games from source %s", source.id)
logging.debug("Importing games from source %s", source.source_id)
task = Task.new(None, None, self.source_callback, (source,))
self.n_source_tasks_created += 1
task.set_task_data((source,))
@@ -139,16 +139,16 @@ class Importer(ErrorProducer):
# Early exit if not available or not installed
if not source.is_available:
logging.info("Source %s skipped, not available", source.id)
logging.info("Source %s skipped, not available", source.source_id)
return
try:
iterator = iter(source)
except UnresolvableLocationError:
logging.info("Source %s skipped, bad location", source.id)
logging.info("Source %s skipped, bad location", source.source_id)
return
# Get games from source
logging.info("Scanning source %s", source.id)
logging.info("Scanning source %s", source.source_id)
while True:
# Handle exceptions raised when iterating
try:
@@ -156,7 +156,7 @@ class Importer(ErrorProducer):
except StopIteration:
break
except Exception as error: # pylint: disable=broad-exception-caught
logging.exception("%s in %s", type(error).__name__, source.id)
logging.exception("%s in %s", type(error).__name__, source.source_id)
self.report_error(error)
continue
@@ -173,7 +173,7 @@ class Importer(ErrorProducer):
# Should not happen on production code
logging.warning(
"%s produced an invalid iteration return type %s",
source.id,
source.source_id,
type(iteration_result),
)
continue
@@ -195,7 +195,7 @@ class Importer(ErrorProducer):
def source_callback(self, _obj, _result, data):
"""Callback executed when a source is fully scanned"""
source, *_rest = data
logging.debug("Import done for source %s", source.id)
logging.debug("Import done for source %s", source.source_id)
self.n_source_tasks_done += 1
self.progress_changed_callback()

View File

@@ -20,33 +20,30 @@
from pathlib import Path
from time import time
from typing import NamedTuple
import yaml
from src import shared
from src.game import Game
from src.importer.sources.location import Location
from src.importer.sources.source import (
SourceIterationResult,
SourceIterator,
URLExecutableSource,
)
from src.importer.sources.location import Location, LocationSubPath
from src.importer.sources.source import SourceIterable, URLExecutableSource
class BottlesSourceIterator(SourceIterator):
class BottlesSourceIterable(SourceIterable):
source: "BottlesSource"
def generator_builder(self) -> SourceIterationResult:
def __iter__(self):
"""Generator method producing games"""
data = self.source.data_location["library.yml"].read_text("utf-8")
data = self.source.locations.data["library.yml"].read_text("utf-8")
library: dict = yaml.safe_load(data)
added_time = int(time())
for entry in library.values():
# Build game
values = {
"source": self.source.id,
"source": self.source.source_id,
"added": added_time,
"name": entry["name"],
"game_id": self.source.game_id_format.format(game_id=entry["id"]),
@@ -62,11 +59,11 @@ class BottlesSourceIterator(SourceIterator):
# as Cartridges can't access directories picked via Bottles' file picker portal
bottles_location = Path(
yaml.safe_load(
self.source.data_location["data.yml"].read_text("utf-8")
self.source.locations.data["data.yml"].read_text("utf-8")
)["custom_bottles_path"]
)
except (FileNotFoundError, KeyError):
bottles_location = self.source.data_location.root / "bottles"
bottles_location = self.source.locations.data.root / "bottles"
bottle_path = entry["bottle"]["path"]
@@ -80,23 +77,31 @@ class BottlesSourceIterator(SourceIterator):
yield (game, additional_data)
class BottlesLocations(NamedTuple):
data: Location
class BottlesSource(URLExecutableSource):
"""Generic Bottles source"""
source_id = "bottles"
name = _("Bottles")
iterator_class = BottlesSourceIterator
iterable_class = BottlesSourceIterable
url_format = 'bottles:run/"{bottle_name}"/"{game_name}"'
available_on = {"linux"}
data_location = Location(
schema_key="bottles-location",
candidates=(
shared.flatpak_dir / "com.usebottles.bottles" / "data" / "bottles",
shared.data_dir / "bottles/",
shared.home / ".local" / "share" / "bottles",
),
paths={
"library.yml": (False, "library.yml"),
"data.yml": (False, "data.yml"),
},
locations = BottlesLocations(
Location(
schema_key="bottles-location",
candidates=(
shared.flatpak_dir / "com.usebottles.bottles" / "data" / "bottles",
shared.data_dir / "bottles/",
shared.home / ".local" / "share" / "bottles",
),
paths={
"library.yml": LocationSubPath("library.yml"),
"data.yml": LocationSubPath("data.yml"),
},
invalid_subtitle=Location.DATA_INVALID_SUBTITLE,
)
)

View File

@@ -19,25 +19,26 @@
from pathlib import Path
from time import time
from typing import NamedTuple
from gi.repository import GLib, Gtk
from src import shared
from src.game import Game
from src.importer.sources.location import Location
from src.importer.sources.source import Source, SourceIterationResult, SourceIterator
from src.importer.sources.location import Location, LocationSubPath
from src.importer.sources.source import Source, SourceIterable
class FlatpakSourceIterator(SourceIterator):
class FlatpakSourceIterable(SourceIterable):
source: "FlatpakSource"
def generator_builder(self) -> SourceIterationResult:
def __iter__(self):
"""Generator method producing games"""
added_time = int(time())
icon_theme = Gtk.IconTheme.new()
icon_theme.add_search_path(str(self.source.data_location["icons"]))
icon_theme.add_search_path(str(self.source.locations.data["icons"]))
blacklist = (
{"hu.kramo.Cartridges", "hu.kramo.Cartridges.Devel"}
@@ -53,7 +54,7 @@ class FlatpakSourceIterator(SourceIterator):
}
)
for entry in (self.source.data_location["applications"]).iterdir():
for entry in (self.source.locations.data["applications"]).iterdir():
if entry.suffix != ".desktop":
continue
@@ -76,7 +77,7 @@ class FlatpakSourceIterator(SourceIterator):
continue
values = {
"source": self.source.id,
"source": self.source.source_id,
"added": added_time,
"name": name,
"game_id": self.source.game_id_format.format(game_id=flatpak_id),
@@ -111,22 +112,30 @@ class FlatpakSourceIterator(SourceIterator):
yield (game, additional_data)
class FlatpakLocations(NamedTuple):
data: Location
class FlatpakSource(Source):
"""Generic Flatpak source"""
source_id = "flatpak"
name = _("Flatpak")
iterator_class = FlatpakSourceIterator
iterable_class = FlatpakSourceIterable
executable_format = "flatpak run {flatpak_id}"
available_on = {"linux"}
data_location = Location(
schema_key="flatpak-location",
candidates=(
"/var/lib/flatpak/",
shared.data_dir / "flatpak",
),
paths={
"applications": (True, "exports/share/applications"),
"icons": (True, "exports/share/icons"),
},
locations = FlatpakLocations(
Location(
schema_key="flatpak-location",
candidates=(
"/var/lib/flatpak/",
shared.data_dir / "flatpak",
),
paths={
"applications": LocationSubPath("exports/share/applications", True),
"icons": LocationSubPath("exports/share/icons", True),
},
invalid_subtitle=Location.DATA_INVALID_SUBTITLE,
)
)

View File

@@ -20,21 +20,47 @@
import json
import logging
from abc import abstractmethod
from hashlib import sha256
from json import JSONDecodeError
from pathlib import Path
from time import time
from typing import Optional, TypedDict
from typing import Iterable, NamedTuple, Optional, TypedDict
from functools import cached_property
from src import shared
from src.game import Game
from src.importer.sources.location import Location
from src.importer.sources.location import Location, LocationSubPath
from src.importer.sources.source import (
URLExecutableSource,
SourceIterable,
SourceIterationResult,
SourceIterator,
URLExecutableSource,
)
def path_json_load(path: Path):
"""
Load JSON from the file at the given path
:raises OSError: if the file can't be opened
:raises JSONDecodeError: if the file isn't valid JSON
"""
with path.open("r", encoding="utf-8") as open_file:
return json.load(open_file)
class InvalidLibraryFileError(Exception):
pass
class InvalidInstalledFileError(Exception):
pass
class InvalidStoreFileError(Exception):
pass
class HeroicLibraryEntry(TypedDict):
app_name: str
installed: Optional[bool]
@@ -44,119 +70,320 @@ class HeroicLibraryEntry(TypedDict):
art_square: str
class HeroicSubSource(TypedDict):
service: str
path: tuple[str]
class SubSourceIterable(Iterable):
"""Class representing a Heroic sub-source"""
class HeroicSourceIterator(SourceIterator):
source: "HeroicSource"
source_iterable: "HeroicSourceIterable"
name: str
service: str
image_uri_params: str = ""
relative_library_path: Path
library_json_entries_key: str = "library"
sub_sources: dict[str, HeroicSubSource] = {
"sideload": {
"service": "sideload",
"path": ("sideload_apps", "library.json"),
},
"legendary": {
"service": "epic",
"path": ("store_cache", "legendary_library.json"),
},
"gog": {
"service": "gog",
"path": ("store_cache", "gog_library.json"),
},
}
def __init__(self, source, source_iterable) -> None:
self.source = source
self.source_iterable = source_iterable
def game_from_library_entry(
@cached_property
def library_path(self) -> Path:
path = self.source.locations.config.root / self.relative_library_path
logging.debug("Using Heroic %s library.json path %s", self.name, path)
return path
def process_library_entry(
self, entry: HeroicLibraryEntry, added_time: int
) -> SourceIterationResult:
"""Helper method used to build a Game from a Heroic library entry"""
"""Build a Game from a Heroic library entry"""
# Skip games that are not installed
if not entry["is_installed"]:
return None
# Build game
app_name = entry["app_name"]
runner = entry["runner"]
service = self.sub_sources[runner]["service"]
# Build game
values = {
"source": f"{self.source.id}_{service}",
"source": f"{self.source.source_id}_{self.service}",
"added": added_time,
"name": entry["title"],
"developer": entry.get("developer", None),
"game_id": self.source.game_id_format.format(
service=service, game_id=app_name
service=self.service, game_id=app_name
),
"executable": self.source.executable_format.format(app_name=app_name),
"executable": self.source.executable_format.format(runner=runner, app_name=app_name),
"hidden": self.source_iterable.is_hidden(app_name),
}
game = Game(values)
# Get the image path from the heroic cache
# Filenames are derived from the URL that heroic used to get the file
uri: str = entry["art_square"]
if service == "epic":
uri += "?h=400&resize=1&w=300"
uri: str = entry["art_square"] + self.image_uri_params
digest = sha256(uri.encode()).hexdigest()
image_path = self.source.config_location.root / "images-cache" / digest
image_path = self.source.locations.config.root / "images-cache" / digest
additional_data = {"local_image_path": image_path}
return (game, additional_data)
def generator_builder(self) -> SourceIterationResult:
def __iter__(self):
"""
Iterate through the games with a generator
:raises InvalidLibraryFileError: on initial call if the library file is bad
"""
added_time = int(time())
try:
iterator = iter(
path_json_load(self.library_path)[self.library_json_entries_key]
)
except (OSError, JSONDecodeError, TypeError, KeyError) as error:
raise InvalidLibraryFileError(
f"Invalid {self.library_path.name}"
) from error
for entry in iterator:
try:
yield self.process_library_entry(entry, added_time)
except KeyError as error:
logging.warning(
"Skipped invalid %s game %s",
self.name,
entry.get("app_name", "UNKNOWN"),
exc_info=error,
)
continue
class StoreSubSourceIterable(SubSourceIterable):
"""
Class representing a "store" sub source.
Games can be installed or not, this class does the check accordingly.
"""
relative_installed_path: Path
installed_app_names: set[str]
@cached_property
def installed_path(self) -> Path:
path = self.source.locations.config.root / self.relative_installed_path
logging.debug("Using Heroic %s installed.json path %s", self.name, path)
return path
@abstractmethod
def get_installed_app_names(self) -> set[str]:
"""
Get the sub source's installed app names as a set.
:raises InvalidInstalledFileError: if the installed file data cannot be read
Whenever possible, `__cause__` is set with the original exception
"""
def is_installed(self, app_name: str) -> bool:
return app_name in self.installed_app_names
def process_library_entry(self, entry, added_time):
# Skip games that are not installed
app_name = entry["app_name"]
if not self.is_installed(app_name):
logging.warning(
"Skipped %s game %s (%s): not installed",
self.service,
entry["title"],
app_name,
)
return None
# Process entry as normal
return super().process_library_entry(entry, added_time)
def __iter__(self):
"""
Iterate through the installed games with a generator
:raises InvalidLibraryFileError: on initial call if the library file is bad
:raises InvalidInstalledFileError: on initial call if the installed file is bad
"""
self.installed_app_names = self.get_installed_app_names()
yield from super().__iter__()
class SideloadIterable(SubSourceIterable):
name = "sideload"
service = "sideload"
relative_library_path = Path("sideload_apps") / "library.json"
library_json_entries_key = "games"
class LegendaryIterable(StoreSubSourceIterable):
name = "legendary"
service = "epic"
image_uri_params = "?h=400&resize=1&w=300"
relative_library_path = Path("store_cache") / "legendary_library.json"
# relative_installed_path = (
# Path("legendary") / "legendaryConfig" / "legendary" / "installed.json"
# )
@cached_property
def installed_path(self) -> Path:
"""
Get the right path depending on the Heroic version
TODO after heroic 2.9 has been out for a while
We should use the commented out relative_installed_path
and remove this property override.
"""
heroic_config_path = self.source.locations.config.root
# Heroic >= 2.9
if (path := heroic_config_path / "legendaryConfig").is_dir():
logging.debug("Using Heroic >= 2.9 legendary file")
# Heroic <= 2.8
elif heroic_config_path.is_relative_to(shared.flatpak_dir):
# Heroic flatpak
path = shared.flatpak_dir / "com.heroicgameslauncher.hgl" / "config"
logging.debug("Using Heroic flatpak <= 2.8 legendary file")
else:
# Heroic native
logging.debug("Using Heroic native <= 2.8 legendary file")
path = Path.home() / ".config"
path = path / "legendary" / "installed.json"
logging.debug("Using Heroic %s installed.json path %s", self.name, path)
return path
def get_installed_app_names(self):
try:
return set(path_json_load(self.installed_path).keys())
except (OSError, JSONDecodeError, AttributeError) as error:
raise InvalidInstalledFileError(
f"Invalid {self.installed_path.name}"
) from error
class GogIterable(StoreSubSourceIterable):
name = "gog"
service = "gog"
library_json_entries_key = "games"
relative_library_path = Path("store_cache") / "gog_library.json"
relative_installed_path = Path("gog_store") / "installed.json"
def get_installed_app_names(self):
try:
return {
app_name
for entry in path_json_load(self.installed_path)["installed"]
if (app_name := entry.get("appName")) is not None
}
except (OSError, JSONDecodeError, KeyError, AttributeError) as error:
raise InvalidInstalledFileError(
f"Invalid {self.installed_path.name}"
) from error
class NileIterable(StoreSubSourceIterable):
name = "nile"
service = "amazon"
relative_library_path = Path("store_cache") / "nile_library.json"
relative_installed_path = Path("nile_config") / "nile" / "installed.json"
def get_installed_app_names(self):
try:
installed_json = path_json_load(self.installed_path)
return {
app_name
for entry in installed_json
if (app_name := entry.get("id")) is not None
}
except (OSError, JSONDecodeError, AttributeError) as error:
raise InvalidInstalledFileError(
f"Invalid {self.installed_path.name}"
) from error
class HeroicSourceIterable(SourceIterable):
source: "HeroicSource"
hidden_app_names: set[str] = set()
def is_hidden(self, app_name: str) -> bool:
return app_name in self.hidden_app_names
def get_hidden_app_names(self) -> set[str]:
"""Get the hidden app names from store/config.json
:raises InvalidStoreFileError: if the store is invalid for some reason
"""
try:
store = path_json_load(self.source.locations.config["store_config.json"])
self.hidden_app_names = {
app_name
for game in store["games"]["hidden"]
if (app_name := game.get("appName")) is not None
}
except KeyError:
logging.warning('No ["games"]["hidden"] key in Heroic store file')
except (OSError, JSONDecodeError, TypeError) as error:
logging.error("Invalid Heroic store file", exc_info=error)
raise InvalidStoreFileError() from error
def __iter__(self):
"""Generator method producing games from all the Heroic sub-sources"""
for sub_source_name, sub_source in self.sub_sources.items():
# Skip disabled sub-sources
if not shared.schema.get_boolean("heroic-import-" + sub_source["service"]):
self.get_hidden_app_names()
# Get games from the sub sources
for sub_source_class in (
SideloadIterable,
LegendaryIterable,
GogIterable,
NileIterable,
):
sub_source = sub_source_class(self.source, self)
if not shared.schema.get_boolean("heroic-import-" + sub_source.service):
logging.debug("Skipping Heroic %s: disabled", sub_source.service)
continue
# Load games from JSON
file = self.source.config_location.root.joinpath(*sub_source["path"])
try:
contents = json.load(file.open())
key = "library" if sub_source_name == "legendary" else "games"
library = contents[key]
except (JSONDecodeError, OSError, KeyError):
# Invalid library.json file, skip it
logging.warning("Couldn't open Heroic file: %s", str(file))
sub_source_iterable = iter(sub_source)
yield from sub_source_iterable
except (InvalidLibraryFileError, InvalidInstalledFileError) as error:
logging.error(
"Skipping bad Heroic sub-source %s",
sub_source.service,
exc_info=error,
)
continue
added_time = int(time())
for entry in library:
try:
result = self.game_from_library_entry(entry, added_time)
except KeyError as error:
# Skip invalid games
logging.warning(
"Invalid Heroic game skipped in %s", str(file), exc_info=error
)
continue
yield result
class HeroicLocations(NamedTuple):
config: Location
class HeroicSource(URLExecutableSource):
"""Generic Heroic Games Launcher source"""
source_id = "heroic"
name = _("Heroic")
iterator_class = HeroicSourceIterator
url_format = "heroic://launch/{app_name}"
iterable_class = HeroicSourceIterable
url_format = "heroic://launch/{runner}/{app_name}"
available_on = {"linux", "win32"}
config_location = Location(
schema_key="heroic-location",
candidates=(
shared.flatpak_dir / "com.heroicgameslauncher.hgl" / "config" / "heroic",
shared.config_dir / "heroic",
shared.home / ".config" / "heroic",
shared.appdata_dir / "heroic",
),
paths={
"config.json": (False, "config.json"),
},
locations = HeroicLocations(
Location(
schema_key="heroic-location",
candidates=(
shared.config_dir / "heroic",
shared.home / ".config" / "heroic",
shared.flatpak_dir
/ "com.heroicgameslauncher.hgl"
/ "config"
/ "heroic",
shared.appdata_dir / "heroic",
),
paths={
"config.json": LocationSubPath("config.json"),
"store_config.json": LocationSubPath("store/config.json"),
},
invalid_subtitle=Location.CONFIG_INVALID_SUBTITLE,
)
)
@property
def game_id_format(self) -> str:
"""The string format used to construct game IDs"""
return self.id + "_{service}_{game_id}"
return self.source_id + "_{service}_{game_id}"

View File

@@ -21,22 +21,19 @@
from shutil import rmtree
from sqlite3 import connect
from time import time
from typing import NamedTuple
from src import shared
from src.game import Game
from src.importer.sources.location import Location
from src.importer.sources.source import (
SourceIterationResult,
SourceIterator,
URLExecutableSource,
)
from src.importer.sources.location import Location, LocationSubPath
from src.importer.sources.source import SourceIterable, URLExecutableSource
from src.utils.sqlite import copy_db
class ItchSourceIterator(SourceIterator):
class ItchSourceIterable(SourceIterable):
source: "ItchSource"
def generator_builder(self) -> SourceIterationResult:
def __iter__(self):
"""Generator method producing games"""
# Query the database
@@ -55,7 +52,7 @@ class ItchSourceIterator(SourceIterator):
caves.game_id = games.id
;
"""
db_path = copy_db(self.source.config_location["butler.db"])
db_path = copy_db(self.source.locations.config["butler.db"])
connection = connect(db_path)
cursor = connection.execute(db_request)
@@ -65,7 +62,7 @@ class ItchSourceIterator(SourceIterator):
for row in cursor:
values = {
"added": added_time,
"source": self.source.id,
"source": self.source.source_id,
"name": row[1],
"game_id": self.source.game_id_format.format(game_id=row[0]),
"executable": self.source.executable_format.format(cave_id=row[4]),
@@ -78,19 +75,29 @@ class ItchSourceIterator(SourceIterator):
rmtree(str(db_path.parent))
class ItchLocations(NamedTuple):
config: Location
class ItchSource(URLExecutableSource):
source_id = "itch"
name = _("itch")
iterator_class = ItchSourceIterator
iterable_class = ItchSourceIterable
url_format = "itch://caves/{cave_id}/launch"
available_on = {"linux", "win32"}
config_location = Location(
schema_key="itch-location",
candidates=(
shared.flatpak_dir / "io.itch.itch" / "config" / "itch",
shared.config_dir / "itch",
shared.home / ".config" / "itch",
shared.appdata_dir / "itch",
),
paths={"butler.db": (False, "db/butler.db")},
locations = ItchLocations(
Location(
schema_key="itch-location",
candidates=(
shared.flatpak_dir / "io.itch.itch" / "config" / "itch",
shared.config_dir / "itch",
shared.home / ".config" / "itch",
shared.appdata_dir / "itch",
),
paths={
"butler.db": LocationSubPath("db/butler.db"),
},
invalid_subtitle=Location.CONFIG_INVALID_SUBTITLE,
)
)

View File

@@ -21,15 +21,15 @@ import json
import logging
from json import JSONDecodeError
from time import time
from typing import Generator
from typing import NamedTuple
from src import shared
from src.game import Game
from src.importer.sources.location import Location
from src.importer.sources.source import Source, SourceIterationResult, SourceIterator
from src.importer.sources.location import Location, LocationSubPath
from src.importer.sources.source import Source, SourceIterationResult, SourceIterable
class LegendarySourceIterator(SourceIterator):
class LegendarySourceIterable(SourceIterable):
source: "LegendarySource"
def game_from_library_entry(
@@ -43,7 +43,7 @@ class LegendarySourceIterator(SourceIterator):
app_name = entry["app_name"]
values = {
"added": added_time,
"source": self.source.id,
"source": self.source.source_id,
"name": entry["title"],
"game_id": self.source.game_id_format.format(game_id=app_name),
"executable": self.source.executable_format.format(app_name=app_name),
@@ -51,7 +51,7 @@ class LegendarySourceIterator(SourceIterator):
data = {}
# Get additional metadata from file (optional)
metadata_file = self.source.config_location["metadata"] / f"{app_name}.json"
metadata_file = self.source.locations.config["metadata"] / f"{app_name}.json"
try:
metadata = json.load(metadata_file.open())
values["developer"] = metadata["metadata"]["developer"]
@@ -65,9 +65,9 @@ class LegendarySourceIterator(SourceIterator):
game = Game(values)
return (game, data)
def generator_builder(self) -> Generator[SourceIterationResult, None, None]:
def __iter__(self):
# Open library
file = self.source.config_location["installed.json"]
file = self.source.locations.config["installed.json"]
try:
library: dict = json.load(file.open())
except (JSONDecodeError, OSError):
@@ -89,20 +89,28 @@ class LegendarySourceIterator(SourceIterator):
yield result
class LegendaryLocations(NamedTuple):
config: Location
class LegendarySource(Source):
source_id = "legendary"
name = _("Legendary")
executable_format = "legendary launch {app_name}"
available_on = {"linux"}
iterable_class = LegendarySourceIterable
iterator_class = LegendarySourceIterator
config_location: Location = Location(
schema_key="legendary-location",
candidates=(
shared.config_dir / "legendary",
shared.home / ".config" / "legendary",
),
paths={
"installed.json": (False, "installed.json"),
"metadata": (True, "metadata"),
},
locations = LegendaryLocations(
Location(
schema_key="legendary-location",
candidates=(
shared.config_dir / "legendary",
shared.home / ".config" / "legendary",
),
paths={
"installed.json": LocationSubPath("installed.json"),
"metadata": LocationSubPath("metadata", True),
},
invalid_subtitle=Location.CONFIG_INVALID_SUBTITLE,
)
)

View File

@@ -1,13 +1,18 @@
import logging
from pathlib import Path
from typing import Callable, Mapping, Iterable
from typing import Mapping, Iterable, NamedTuple
from os import PathLike
from src import shared
PathSegment = str | PathLike | Path
PathSegments = Iterable[PathSegment]
Candidate = PathSegments | Callable[[], PathSegments]
Candidate = PathSegments
class LocationSubPath(NamedTuple):
segment: PathSegment
is_directory: bool = False
class UnresolvableLocationError(Exception):
@@ -24,31 +29,42 @@ class Location:
* When resolved, the schema is updated with the picked chosen
"""
# The variable is the name of the source
CACHE_INVALID_SUBTITLE = _("Select the {} cache directory.")
# The variable is the name of the source
CONFIG_INVALID_SUBTITLE = _("Select the {} configuration directory.")
# The variable is the name of the source
DATA_INVALID_SUBTITLE = _("Select the {} data directory.")
schema_key: str
candidates: Iterable[Candidate]
paths: Mapping[str, tuple[bool, PathSegments]]
paths: Mapping[str, LocationSubPath]
invalid_subtitle: str
root: Path = None
def __init__(
self,
schema_key: str,
candidates: Iterable[Candidate],
paths: Mapping[str, tuple[bool, PathSegments]],
paths: Mapping[str, LocationSubPath],
invalid_subtitle: str,
) -> None:
super().__init__()
self.schema_key = schema_key
self.candidates = candidates
self.paths = paths
self.invalid_subtitle = invalid_subtitle
def check_candidate(self, candidate: Path) -> bool:
"""Check if a candidate root has the necessary files and directories"""
for type_is_dir, subpath in self.paths.values():
subpath = Path(candidate) / Path(subpath)
if type_is_dir:
if not subpath.is_dir():
for segment, is_directory in self.paths.values():
path = Path(candidate) / segment
if is_directory:
if not path.is_dir():
return False
else:
if not subpath.is_file():
if not path.is_file():
return False
return True
@@ -81,4 +97,4 @@ class Location:
def __getitem__(self, key: str):
"""Get the computed path from its key for the location"""
self.resolve()
return self.root / self.paths[key][1]
return self.root / self.paths[key].segment

View File

@@ -20,22 +20,19 @@
from shutil import rmtree
from sqlite3 import connect
from time import time
from typing import NamedTuple
from src import shared
from src.game import Game
from src.importer.sources.location import Location
from src.importer.sources.source import (
SourceIterationResult,
SourceIterator,
URLExecutableSource,
)
from src.importer.sources.location import Location, LocationSubPath
from src.importer.sources.source import SourceIterable, URLExecutableSource
from src.utils.sqlite import copy_db
class LutrisSourceIterator(SourceIterator):
class LutrisSourceIterable(SourceIterable):
source: "LutrisSource"
def generator_builder(self) -> SourceIterationResult:
def __iter__(self):
"""Generator method producing games"""
# Query the database
@@ -55,7 +52,7 @@ class LutrisSourceIterator(SourceIterator):
"import_steam": shared.schema.get_boolean("lutris-import-steam"),
"import_flatpak": shared.schema.get_boolean("lutris-import-flatpak"),
}
db_path = copy_db(self.source.data_location["pga.db"])
db_path = copy_db(self.source.locations.config["pga.db"])
connection = connect(db_path)
cursor = connection.execute(request, params)
@@ -68,7 +65,7 @@ class LutrisSourceIterator(SourceIterator):
"added": added_time,
"hidden": row[4],
"name": row[1],
"source": f"{self.source.id}_{row[3]}",
"source": f"{self.source.source_id}_{row[3]}",
"game_id": self.source.game_id_format.format(
runner=row[3], game_id=row[0]
),
@@ -77,7 +74,7 @@ class LutrisSourceIterator(SourceIterator):
game = Game(values)
# Get official image path
image_path = self.source.cache_location["coverart"] / f"{row[2]}.jpg"
image_path = self.source.locations.cache["coverart"] / f"{row[2]}.jpg"
additional_data = {"local_image_path": image_path}
# Produce game
@@ -87,40 +84,49 @@ class LutrisSourceIterator(SourceIterator):
rmtree(str(db_path.parent))
class LutrisLocations(NamedTuple):
config: Location
cache: Location
class LutrisSource(URLExecutableSource):
"""Generic Lutris source"""
source_id = "lutris"
name = _("Lutris")
iterator_class = LutrisSourceIterator
iterable_class = LutrisSourceIterable
url_format = "lutris:rungameid/{game_id}"
available_on = {"linux"}
# FIXME possible bug: location picks ~/.var... and cache_lcoation picks ~/.local...
# FIXME possible bug: config picks ~/.var... and cache picks ~/.local...
data_location = Location(
schema_key="lutris-location",
candidates=(
shared.flatpak_dir / "net.lutris.Lutris" / "data" / "lutris",
shared.data_dir / "lutris",
shared.home / ".local" / "share" / "lutris",
locations = LutrisLocations(
Location(
schema_key="lutris-location",
candidates=(
shared.flatpak_dir / "net.lutris.Lutris" / "data" / "lutris",
shared.data_dir / "lutris",
shared.home / ".local" / "share" / "lutris",
),
paths={
"pga.db": LocationSubPath("pga.db"),
},
invalid_subtitle=Location.DATA_INVALID_SUBTITLE,
),
paths={
"pga.db": (False, "pga.db"),
},
)
cache_location = Location(
schema_key="lutris-cache-location",
candidates=(
shared.flatpak_dir / "net.lutris.Lutris" / "cache" / "lutris",
shared.cache_dir / "lutris",
shared.home / ".cache" / "lutris",
Location(
schema_key="lutris-cache-location",
candidates=(
shared.flatpak_dir / "net.lutris.Lutris" / "cache" / "lutris",
shared.cache_dir / "lutris",
shared.home / ".cache" / "lutris",
),
paths={
"coverart": LocationSubPath("coverart", True),
},
invalid_subtitle=Location.CACHE_INVALID_SUBTITLE,
),
paths={
"coverart": (True, "coverart"),
},
)
@property
def game_id_format(self):
return self.id + "_{runner}_{game_id}"
return self.source_id + "_{runner}_{game_id}"

View File

@@ -0,0 +1,216 @@
# retroarch_source.py
#
# Copyright 2023 Rilic
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# SPDX-License-Identifier: GPL-3.0-or-later
import json
import logging
import re
from hashlib import md5
from json import JSONDecodeError
from pathlib import Path
from time import time
from typing import NamedTuple
from src import shared
from src.errors.friendly_error import FriendlyError
from src.game import Game
from src.importer.sources.location import (
Location,
LocationSubPath,
UnresolvableLocationError,
)
from src.importer.sources.source import Source, SourceIterable
from src.importer.sources.steam_source import SteamSource
class RetroarchSourceIterable(SourceIterable):
source: "RetroarchSource"
def get_config_value(self, key: str, config_data: str):
for item in re.findall(f'{key}\\s*=\\s*"(.*)"\n', config_data, re.IGNORECASE):
if item.startswith(":"):
item = item.replace(":", str(self.source.locations.config.root))
logging.debug(str(item))
return item
raise KeyError(f"Key not found in RetroArch config: {key}")
def __iter__(self):
added_time = int(time())
bad_playlists = set()
config_file = self.source.locations.config["retroarch.cfg"]
with config_file.open(encoding="utf-8") as open_file:
config_data = open_file.read()
playlist_folder = Path(
self.get_config_value("playlist_directory", config_data)
).expanduser()
thumbnail_folder = Path(
self.get_config_value("thumbnails_directory", config_data)
).expanduser()
# Get all playlist files, ending in .lpl
playlist_files = playlist_folder.glob("*.lpl")
for playlist_file in playlist_files:
logging.debug(playlist_file)
try:
with playlist_file.open(
encoding="utf-8",
) as open_file:
playlist_json = json.load(open_file)
except (JSONDecodeError, OSError):
logging.warning("Cannot read playlist file: %s", str(playlist_file))
continue
for item in playlist_json["items"]:
# Select the core.
# Try the content's core first, then the playlist's default core.
# If none can be used, warn the user and continue.
for core_path in (
item["core_path"],
playlist_json["default_core_path"],
):
if core_path not in ("DETECT", ""):
break
else:
logging.warning("Cannot find core for: %s", str(item["path"]))
bad_playlists.add(playlist_file.stem)
continue
# Build game
game_id = md5(item["path"].encode("utf-8")).hexdigest()
values = {
"source": self.source.source_id,
"added": added_time,
"name": item["label"],
"game_id": self.source.game_id_format.format(game_id=game_id),
"executable": self.source.executable_format.format(
rom_path=item["path"],
core_path=core_path,
),
}
game = Game(values)
# Get boxart
boxart_image_name = item["label"] + ".png"
boxart_image_name = re.sub(r"[&\*\/:`<>\?\\\|]", "_", boxart_image_name)
boxart_folder_name = playlist_file.stem
image_path = (
thumbnail_folder
/ boxart_folder_name
/ "Named_Boxarts"
/ boxart_image_name
)
additional_data = {"local_image_path": image_path}
yield (game, additional_data)
if bad_playlists:
raise FriendlyError(
_("No RetroArch Core Selected"),
# The variable is a newline separated list of playlists
_("The following playlists have no default core:")
+ "\n\n{}\n\n".format("\n".join(bad_playlists))
+ _("Games with no core selected were not imported"),
)
class RetroarchLocations(NamedTuple):
config: Location
class RetroarchSource(Source):
name = _("RetroArch")
source_id = "retroarch"
available_on = {"linux"}
iterable_class = RetroarchSourceIterable
locations = RetroarchLocations(
Location(
schema_key="retroarch-location",
candidates=[
shared.flatpak_dir / "org.libretro.RetroArch" / "config" / "retroarch",
shared.config_dir / "retroarch",
shared.home / ".config" / "retroarch",
# TODO: Windows support, waiting for executable path setting improvement
# Path("C:\\RetroArch-Win64"),
# Path("C:\\RetroArch-Win32"),
# TODO: UWP support (URL handler - https://github.com/libretro/RetroArch/pull/13563)
# shared.local_appdata_dir
# / "Packages"
# / "1e4cf179-f3c2-404f-b9f3-cb2070a5aad8_8ngdn9a6dx1ma"
# / "LocalState",
],
paths={
"retroarch.cfg": LocationSubPath("retroarch.cfg"),
},
invalid_subtitle=Location.CONFIG_INVALID_SUBTITLE,
)
)
@property
def executable_format(self):
self.locations.config.resolve()
is_flatpak = self.locations.config.root.is_relative_to(shared.flatpak_dir)
base = "flatpak run org.libretro.RetroArch" if is_flatpak else "retroarch"
args = '-L "{core_path}" "{rom_path}"'
return f"{base} {args}"
def __init__(self) -> None:
super().__init__()
try:
self.locations.config.candidates.append(self.get_steam_location())
except (OSError, KeyError, UnresolvableLocationError):
logging.debug("Steam isn't installed")
except ValueError as error:
logging.debug("RetroArch Steam location candiate not found", exc_info=error)
def get_steam_location(self) -> str:
"""
Get the RetroArch installed via Steam location
:raise UnresolvableLocationError: if steam isn't installed
:raise KeyError: if there is no libraryfolders.vdf subpath
:raise OSError: if libraryfolders.vdf can't be opened
:raise ValueError: if RetroArch isn't installed through Steam
"""
# Find steam location
libraryfolders = SteamSource().locations.data["libraryfolders.vdf"]
parse_apps = False
with open(libraryfolders, "r", encoding="utf-8") as open_file:
# Search each line for a library path and store it each time a new one is found.
for line in open_file:
if '"path"' in line:
library_path = re.findall(
'"path"\\s+"(.*)"\n', line, re.IGNORECASE
)[0]
elif '"apps"' in line:
parse_apps = True
elif parse_apps and "}" in line:
parse_apps = False
# Stop searching, as the library path directly above the appid has been found.
elif parse_apps and '"1118310"' in line:
return Path(f"{library_path}/steamapps/common/RetroArch")
# Not found
raise ValueError("RetroArch not found in Steam library")

View File

@@ -19,8 +19,8 @@
import sys
from abc import abstractmethod
from collections.abc import Iterable, Iterator
from typing import Any, Generator, Optional
from collections.abc import Iterable
from typing import Any, Generator, Collection
from src.game import Game
from src.importer.sources.location import Location
@@ -29,25 +29,16 @@ from src.importer.sources.location import Location
SourceIterationResult = None | Game | tuple[Game, tuple[Any]]
class SourceIterator(Iterator):
class SourceIterable(Iterable):
"""Data producer for a source of games"""
source: "Source" = None
generator: Generator = None
def __init__(self, source: "Source") -> None:
super().__init__()
self.source = source
self.generator = self.generator_builder()
def __iter__(self) -> "SourceIterator":
return self
def __next__(self) -> SourceIterationResult:
return next(self.generator)
@abstractmethod
def generator_builder(self) -> Generator[SourceIterationResult, None, None]:
def __iter__(self) -> Generator[SourceIterationResult, None, None]:
"""
Method that returns a generator that produces games
* Should be implemented as a generator method
@@ -60,13 +51,12 @@ class SourceIterator(Iterator):
class Source(Iterable):
"""Source of games. E.g an installed app with a config file that lists game directories"""
source_id: str
name: str
variant: str = None
available_on: set[str] = set()
data_location: Optional[Location] = None
cache_location: Optional[Location] = None
config_location: Optional[Location] = None
iterator_class: type[SourceIterator]
iterable_class: type[SourceIterable]
locations: Collection[Location]
@property
def full_name(self) -> str:
@@ -76,18 +66,10 @@ class Source(Iterable):
full_name_ += f" ({self.variant})"
return full_name_
@property
def id(self) -> str: # pylint: disable=invalid-name
"""The source's identifier"""
id_ = self.name.lower()
if self.variant is not None:
id_ += f"_{self.variant.lower()}"
return id_
@property
def game_id_format(self) -> str:
"""The string format used to construct game IDs"""
return self.id + "_{game_id}"
return self.source_id + "_{game_id}"
@property
def is_available(self):
@@ -98,7 +80,7 @@ class Source(Iterable):
def executable_format(self) -> str:
"""The executable format used to construct game executables"""
def __iter__(self) -> SourceIterator:
def __iter__(self) -> Generator[SourceIterationResult, None, None]:
"""
Get an iterator for the source
:raises UnresolvableLocationError: Not iterable if any of the locations are unresolvable
@@ -108,7 +90,7 @@ class Source(Iterable):
if location is None:
continue
location.resolve()
return self.iterator_class(self)
return iter(self.iterable_class(self))
# pylint: disable=abstract-method

View File

@@ -18,28 +18,25 @@
#
# SPDX-License-Identifier: GPL-3.0-or-later
import logging
import re
from pathlib import Path
from time import time
from typing import Iterable
from typing import Iterable, NamedTuple
from src import shared
from src.game import Game
from src.importer.sources.source import (
SourceIterationResult,
SourceIterator,
URLExecutableSource,
)
from src.importer.sources.location import Location, LocationSubPath
from src.importer.sources.source import SourceIterable, URLExecutableSource
from src.utils.steam import SteamFileHelper, SteamInvalidManifestError
from src.importer.sources.location import Location
class SteamSourceIterator(SourceIterator):
class SteamSourceIterable(SourceIterable):
source: "SteamSource"
def get_manifest_dirs(self) -> Iterable[Path]:
"""Get dirs that contain steam app manifests"""
libraryfolders_path = self.source.data_location["libraryfolders.vdf"]
libraryfolders_path = self.source.locations.data["libraryfolders.vdf"]
with open(libraryfolders_path, "r", encoding="utf-8") as file:
contents = file.read()
return [
@@ -62,7 +59,7 @@ class SteamSourceIterator(SourceIterator):
)
return manifests
def generator_builder(self) -> SourceIterationResult:
def __iter__(self):
"""Generator method producing games"""
appid_cache = set()
manifests = self.get_manifests()
@@ -74,17 +71,20 @@ class SteamSourceIterator(SourceIterator):
steam = SteamFileHelper()
try:
local_data = steam.get_manifest_data(manifest)
except (OSError, SteamInvalidManifestError):
except (OSError, SteamInvalidManifestError) as error:
logging.debug("Couldn't load appmanifest %s", manifest, exc_info=error)
continue
# Skip non installed games
installed_mask = 4
if not int(local_data["stateflags"]) & installed_mask:
logging.debug("Skipped %s: not installed", manifest)
continue
# Skip duplicate appids
appid = local_data["appid"]
if appid in appid_cache:
logging.debug("Skipped %s: appid already seen during import", manifest)
continue
appid_cache.add(appid)
@@ -92,7 +92,7 @@ class SteamSourceIterator(SourceIterator):
values = {
"added": added_time,
"name": local_data["name"],
"source": self.source.id,
"source": self.source.source_id,
"game_id": self.source.game_id_format.format(game_id=appid),
"executable": self.source.executable_format.format(game_id=appid),
}
@@ -100,7 +100,7 @@ class SteamSourceIterator(SourceIterator):
# Add official cover image
image_path = (
self.source.data_location["librarycache"]
self.source.locations.data["librarycache"]
/ f"{appid}_library_600x900.jpg"
)
additional_data = {"local_image_path": image_path, "steam_appid": appid}
@@ -109,22 +109,30 @@ class SteamSourceIterator(SourceIterator):
yield (game, additional_data)
class SteamLocations(NamedTuple):
data: Location
class SteamSource(URLExecutableSource):
source_id = "steam"
name = _("Steam")
available_on = {"linux", "win32"}
iterator_class = SteamSourceIterator
iterable_class = SteamSourceIterable
url_format = "steam://rungameid/{game_id}"
data_location = Location(
schema_key="steam-location",
candidates=(
shared.home / ".steam" / "steam",
shared.data_dir / "Steam",
shared.flatpak_dir / "com.valvesoftware.Steam" / "data" / "Steam",
shared.programfiles32_dir / "Steam",
),
paths={
"libraryfolders.vdf": (False, "steamapps/libraryfolders.vdf"),
"librarycache": (True, "appcache/librarycache"),
},
locations = SteamLocations(
Location(
schema_key="steam-location",
candidates=(
shared.home / ".steam" / "steam",
shared.data_dir / "Steam",
shared.flatpak_dir / "com.valvesoftware.Steam" / "data" / "Steam",
shared.programfiles32_dir / "Steam",
),
paths={
"libraryfolders.vdf": LocationSubPath("steamapps/libraryfolders.vdf"),
"librarycache": LocationSubPath("appcache/librarycache", True),
},
invalid_subtitle=Location.DATA_INVALID_SUBTITLE,
)
)

View File

@@ -73,7 +73,7 @@ def setup_logging():
"PIL": {
"handlers": ["lib_console_handler", "file_handler"],
"propagate": False,
"level": "NOTSET",
"level": "WARNING",
},
"urllib3": {
"handlers": ["lib_console_handler", "file_handler"],

View File

@@ -40,13 +40,13 @@ from src.importer.sources.heroic_source import HeroicSource
from src.importer.sources.itch_source import ItchSource
from src.importer.sources.legendary_source import LegendarySource
from src.importer.sources.lutris_source import LutrisSource
from src.importer.sources.retroarch_source import RetroarchSource
from src.importer.sources.steam_source import SteamSource
from src.logging.setup import log_system_info, setup_logging
from src.preferences import PreferencesWindow
from src.store.managers.cover_manager import CoverManager
from src.store.managers.display_manager import DisplayManager
from src.store.managers.file_manager import FileManager
from src.store.managers.local_cover_manager import LocalCoverManager
from src.store.managers.online_cover_manager import OnlineCoverManager
from src.store.managers.sgdb_manager import SGDBManager
from src.store.managers.steam_api_manager import SteamAPIManager
from src.store.store import Store
@@ -101,9 +101,8 @@ class CartridgesApplication(Adw.Application):
self.win.create_source_rows()
# Add rest of the managers for game imports
shared.store.add_manager(LocalCoverManager())
shared.store.add_manager(CoverManager())
shared.store.add_manager(SteamAPIManager())
shared.store.add_manager(OnlineCoverManager())
shared.store.add_manager(SGDBManager())
shared.store.toggle_manager_in_pipelines(FileManager, True)
@@ -180,9 +179,10 @@ class CartridgesApplication(Adw.Application):
(
"kramo https://kramo.hu",
"Geoffrey Coulaud https://geoffrey-coulaud.fr",
"Rilic https://rilic.red",
"Arcitec https://github.com/Arcitec",
"Domenico https://github.com/Domefemia",
"Paweł Lidwin https://github.com/imLinguin",
"Domenico https://github.com/Domefemia",
"Rafael Mardojai CM https://mardojai.com",
)
)
@@ -248,6 +248,9 @@ class CartridgesApplication(Adw.Application):
if shared.schema.get_boolean("legendary"):
importer.add_source(LegendarySource())
if shared.schema.get_boolean("retroarch"):
importer.add_source(RetroarchSource())
importer.run()
def on_remove_game_action(self, *_args):

View File

@@ -32,6 +32,7 @@ from src.importer.sources.itch_source import ItchSource
from src.importer.sources.legendary_source import LegendarySource
from src.importer.sources.location import UnresolvableLocationError
from src.importer.sources.lutris_source import LutrisSource
from src.importer.sources.retroarch_source import RetroarchSource
from src.importer.sources.source import Source
from src.importer.sources.steam_source import SteamSource
from src.utils.create_dialog import create_dialog
@@ -68,6 +69,7 @@ class PreferencesWindow(Adw.PreferencesWindow):
heroic_config_file_chooser_button = Gtk.Template.Child()
heroic_import_epic_switch = Gtk.Template.Child()
heroic_import_gog_switch = Gtk.Template.Child()
heroic_import_amazon_switch = Gtk.Template.Child()
heroic_import_sideload_switch = Gtk.Template.Child()
bottles_expander_row = Gtk.Template.Child()
@@ -82,6 +84,10 @@ class PreferencesWindow(Adw.PreferencesWindow):
legendary_config_action_row = Gtk.Template.Child()
legendary_config_file_chooser_button = Gtk.Template.Child()
retroarch_expander_row = Gtk.Template.Child()
retroarch_config_action_row = Gtk.Template.Child()
retroarch_config_file_chooser_button = Gtk.Template.Child()
flatpak_expander_row = Gtk.Template.Child()
flatpak_data_action_row = Gtk.Template.Child()
flatpak_data_file_chooser_button = Gtk.Template.Child()
@@ -136,11 +142,12 @@ class PreferencesWindow(Adw.PreferencesWindow):
ItchSource,
LegendarySource,
LutrisSource,
RetroarchSource,
SteamSource,
):
source = source_class()
if not source.is_available:
expander_row = getattr(self, f"{source.id}_expander_row")
expander_row = getattr(self, f"{source.source_id}_expander_row")
expander_row.set_visible(False)
else:
self.init_source_row(source)
@@ -170,6 +177,7 @@ class PreferencesWindow(Adw.PreferencesWindow):
"lutris-import-flatpak",
"heroic-import-epic",
"heroic-import-gog",
"heroic-import-amazon",
"heroic-import-sideload",
"flatpak-import-launchers",
"sgdb",
@@ -252,31 +260,35 @@ class PreferencesWindow(Adw.PreferencesWindow):
"""Set the dir subtitle for a source's action rows"""
for location in ("data", "config", "cache"):
# Get the action row to subtitle
action_row = getattr(self, f"{source.id}_{location}_action_row", None)
action_row = getattr(
self, f"{source.source_id}_{location}_action_row", None
)
if not action_row:
continue
infix = "-cache" if location == "cache" else ""
key = f"{source.id}{infix}-location"
key = f"{source.source_id}{infix}-location"
path = Path(shared.schema.get_string(key)).expanduser()
# Remove the path prefix if picked via Flatpak portal
subtitle = re.sub("/run/user/\\d*/doc/.*/", "", str(path))
action_row.set_subtitle(subtitle)
def resolve_locations(self, source):
def resolve_locations(self, source: Source):
"""Resolve locations and add a warning if location cannot be found"""
def clear_warning_selection(_widget, label):
label.select_region(-1, -1)
for location_name in ("data", "config", "cache"):
action_row = getattr(self, f"{source.id}_{location_name}_action_row", None)
for location_name, location in source.locations._asdict().items():
action_row = getattr(
self, f"{source.source_id}_{location_name}_action_row", None
)
if not action_row:
continue
try:
getattr(source, f"{location_name}_location", None).resolve()
location.resolve()
except UnresolvableLocationError:
popover = Gtk.Popover(
@@ -313,7 +325,7 @@ class PreferencesWindow(Adw.PreferencesWindow):
menu_button.add_css_class("warning")
action_row.add_prefix(menu_button)
self.warning_menu_buttons[source.id] = menu_button
self.warning_menu_buttons[source.source_id] = menu_button
def init_source_row(self, source: Source):
"""Initialize a preference row for a source class"""
@@ -327,42 +339,36 @@ class PreferencesWindow(Adw.PreferencesWindow):
return
# Good picked location
location = getattr(source, f"{location_name}_location")
location = getattr(source.locations, location_name)
if location.check_candidate(path):
# Set the schema
infix = "-cache" if location_name == "cache" else ""
key = f"{source.id}{infix}-location"
match location_name:
case "config" | "data":
infix = ""
case _:
infix = f"-{location_name}"
key = f"{source.source_id}{infix}-location"
value = str(path)
shared.schema.set_string(key, value)
# Update the row
self.update_source_action_row_paths(source)
if self.warning_menu_buttons.get(source.id):
if self.warning_menu_buttons.get(source.source_id):
action_row = getattr(
self, f"{source.id}_{location_name}_action_row", None
self, f"{source.source_id}_{location_name}_action_row", None
)
action_row.remove(self.warning_menu_buttons[source.id])
self.warning_menu_buttons.pop(source.id)
action_row.remove(self.warning_menu_buttons[source.source_id])
self.warning_menu_buttons.pop(source.source_id)
logging.debug("User-set value for schema key %s: %s", key, value)
# Bad picked location, inform user
else:
title = _("Invalid Directory")
match location_name:
case "cache":
# The variable is the name of the source
subtitle_format = _("Select the {} cache directory.")
case "config":
# The variable is the name of the source
subtitle_format = _("Select the {} configuration directory.")
case "data":
# The variable is the name of the source
subtitle_format = _("Select the {} data directory.")
dialog = create_dialog(
self,
title,
subtitle_format.format(source.name),
location.invalid_subtitle.format(source.name),
"choose_folder",
_("Set Location"),
)
@@ -374,19 +380,21 @@ class PreferencesWindow(Adw.PreferencesWindow):
dialog.connect("response", on_response)
# Bind expander row activation to source being enabled
expander_row = getattr(self, f"{source.id}_expander_row")
expander_row = getattr(self, f"{source.source_id}_expander_row")
shared.schema.bind(
source.id,
source.source_id,
expander_row,
"enable-expansion",
Gio.SettingsBindFlags.DEFAULT,
)
# Connect dir picker buttons
for location in ("data", "config", "cache"):
button = getattr(self, f"{source.id}_{location}_file_chooser_button", None)
for location_name in source.locations._asdict():
button = getattr(
self, f"{source.source_id}_{location_name}_file_chooser_button", None
)
if button is not None:
button.connect("clicked", self.choose_folder, set_dir, location)
button.connect("clicked", self.choose_folder, set_dir, location_name)
# Set the source row subtitles
self.resolve_locations(source)

View File

@@ -51,6 +51,7 @@ games_dir = data_dir / "cartridges" / "games"
covers_dir = data_dir / "cartridges" / "covers"
appdata_dir = Path(os.getenv("appdata") or "C:\\Users\\Default\\AppData\\Roaming")
local_appdata_dir = Path(os.getenv("csidl_local_appdata") or "C:\\Users\\Default\\AppData\\Local")
programfiles32_dir = Path(os.getenv("programfiles(x86)") or "C:\\Program Files (x86)")
scale_factor = max(

View File

@@ -56,7 +56,7 @@ class AsyncManager(Manager):
def _task_thread_func(self, _task, _source_object, data, _cancellable):
"""Task thread entry point"""
game, additional_data, *_rest = data
self.execute_resilient_manager_logic(game, additional_data)
self.run(game, additional_data)
def _task_callback(self, _source_object, _result, data):
"""Method run after the task is done"""

View File

@@ -0,0 +1,197 @@
# local_cover_manager.py
#
# Copyright 2023 Geoffrey Coulaud
# Copyright 2023 kramo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# SPDX-License-Identifier: GPL-3.0-or-later
from pathlib import Path
from typing import NamedTuple
import requests
from gi.repository import Gio, GdkPixbuf
from requests.exceptions import HTTPError, SSLError
from src import shared
from src.game import Game
from src.store.managers.manager import Manager
from src.store.managers.steam_api_manager import SteamAPIManager
from src.utils.save_cover import resize_cover, save_cover
class ImageSize(NamedTuple):
width: float = 0
height: float = 0
@property
def aspect_ratio(self) -> float:
return self.width / self.height
def __str__(self):
return f"{self.width}x{self.height}"
def __mul__(self, scale: float | int) -> "ImageSize":
return ImageSize(
self.width * scale,
self.height * scale,
)
def __truediv__(self, divisor: float | int) -> "ImageSize":
return self * (1 / divisor)
def __add__(self, other_size: "ImageSize") -> "ImageSize":
return ImageSize(
self.width + other_size.width,
self.height + other_size.height,
)
def __sub__(self, other_size: "ImageSize") -> "ImageSize":
return self + (other_size * -1)
def element_wise_div(self, other_size: "ImageSize") -> "ImageSize":
"""Divide every element of self by the equivalent in the other size"""
return ImageSize(
self.width / other_size.width,
self.height / other_size.height,
)
def element_wise_mul(self, other_size: "ImageSize") -> "ImageSize":
"""Multiply every element of self by the equivalent in the other size"""
return ImageSize(
self.width * other_size.width,
self.height * other_size.height,
)
def invert(self) -> "ImageSize":
"""Invert the element of self"""
return ImageSize(1, 1).element_wise_div(self)
class CoverManager(Manager):
"""
Manager in charge of adding the cover image of the game
Order of priority is:
1. local cover
2. icon cover
3. online cover
"""
run_after = (SteamAPIManager,)
retryable_on = (HTTPError, SSLError, ConnectionError)
def download_image(self, url: str) -> Path:
image_file = Gio.File.new_tmp()[0]
path = Path(image_file.get_path())
with requests.get(url, timeout=5) as cover:
cover.raise_for_status()
path.write_bytes(cover.content)
return path
def is_stretchable(self, source_size: ImageSize, cover_size: ImageSize) -> bool:
is_taller = source_size.aspect_ratio < cover_size.aspect_ratio
if is_taller:
return True
max_stretch = 0.12
resized_height = (1 / source_size.aspect_ratio) * cover_size.width
stretch = 1 - (resized_height / cover_size.height)
return stretch <= max_stretch
def save_composited_cover(
self,
game: Game,
image_path: Path,
scale: float = 1,
blur_size: ImageSize = ImageSize(2, 2),
) -> None:
"""
Save the image composited with a background blur.
If the image is stretchable, just stretch it.
:param game: The game to save the cover for
:param path: Path where the source image is located
:param scale:
Scale of the smalled image side
compared to the corresponding side in the cover
:param blur_size: Size of the downscaled image used for the blur
"""
# Load source image
source = GdkPixbuf.Pixbuf.new_from_file(str(image_path))
source_size = ImageSize(source.get_width(), source.get_height())
cover_size = ImageSize._make(shared.image_size)
# Stretch if possible
if scale == 1 and self.is_stretchable(source_size, cover_size):
save_cover(game.game_id, resize_cover(pixbuf=source))
return
# Create the blurred cover background
# fmt: off
cover = (
source
.scale_simple(*blur_size, GdkPixbuf.InterpType.BILINEAR)
.scale_simple(*cover_size, GdkPixbuf.InterpType.BILINEAR)
)
# fmt: on
# Scale to fit, apply scaling, then center
uniform_scale = scale * min(cover_size.element_wise_div(source_size))
source_in_cover_size = source_size * uniform_scale
source_in_cover_position = (cover_size - source_in_cover_size) / 2
# Center the scaled source image in the cover
source.composite(
cover,
*source_in_cover_position,
*source_in_cover_size,
*source_in_cover_position,
uniform_scale,
uniform_scale,
GdkPixbuf.InterpType.BILINEAR,
255,
)
save_cover(game.game_id, resize_cover(pixbuf=cover))
def main(self, game: Game, additional_data: dict) -> None:
if game.blacklisted:
return
for key in (
"local_image_path",
"local_icon_path",
"online_cover_url",
):
# Get an image path
if not (value := additional_data.get(key)):
continue
if key == "online_cover_url":
image_path = self.download_image(value)
else:
image_path = Path(value)
if not image_path.is_file():
continue
# Icon cover
if key == "local_icon_path":
self.save_composited_cover(
game,
image_path,
scale=0.7,
blur_size=ImageSize(1, 2),
)
return
self.save_composited_cover(game, image_path)

View File

@@ -31,7 +31,7 @@ class DisplayManager(Manager):
run_after = (SteamAPIManager, SGDBManager)
signals = {"update-ready"}
def manager_logic(self, game: Game, _additional_data: dict) -> None:
def main(self, game: Game, _additional_data: dict) -> None:
if game.get_parent():
game.get_parent().get_parent().remove(game)
if game.get_parent():

View File

@@ -31,7 +31,7 @@ class FileManager(AsyncManager):
run_after = (SteamAPIManager,)
signals = {"save-ready"}
def manager_logic(self, game: Game, additional_data: dict) -> None:
def main(self, game: Game, additional_data: dict) -> None:
if additional_data.get("skip_save"): # Skip saving when loading games from disk
return

View File

@@ -1,71 +0,0 @@
# local_cover_manager.py
#
# Copyright 2023 Geoffrey Coulaud
# Copyright 2023 kramo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# SPDX-License-Identifier: GPL-3.0-or-later
from gi.repository import GdkPixbuf
from src import shared
from src.game import Game
from src.store.managers.manager import Manager
from src.store.managers.steam_api_manager import SteamAPIManager
from src.utils.save_cover import resize_cover, save_cover
class LocalCoverManager(Manager):
"""Manager in charge of adding the local cover image of the game"""
run_after = (SteamAPIManager,)
def manager_logic(self, game: Game, additional_data: dict) -> None:
if image_path := additional_data.get("local_image_path"):
if not image_path.is_file():
return
save_cover(game.game_id, resize_cover(image_path))
elif icon_path := additional_data.get("local_icon_path"):
cover_width, cover_height = shared.image_size
dest_width = cover_width * 0.7
dest_height = cover_width * 0.7
dest_x = cover_width * 0.15
dest_y = (cover_height - dest_height) / 2
image = GdkPixbuf.Pixbuf.new_from_file(str(icon_path)).scale_simple(
dest_width, dest_height, GdkPixbuf.InterpType.BILINEAR
)
cover = image.scale_simple(
1, 2, GdkPixbuf.InterpType.BILINEAR
).scale_simple(cover_width, cover_height, GdkPixbuf.InterpType.BILINEAR)
image.composite(
cover,
dest_x,
dest_y,
dest_width,
dest_height,
dest_x,
dest_y,
1,
1,
GdkPixbuf.InterpType.BILINEAR,
255,
)
save_cover(game.game_id, resize_cover(pixbuf=cover))

View File

@@ -50,7 +50,7 @@ class Manager(ErrorProducer):
return type(self).__name__
@abstractmethod
def manager_logic(self, game: Game, additional_data: dict) -> None:
def main(self, game: Game, additional_data: dict) -> None:
"""
Manager specific logic triggered by the run method
* Implemented by final child classes
@@ -59,7 +59,7 @@ class Manager(ErrorProducer):
* May raise other exceptions that will be reported
"""
def execute_resilient_manager_logic(self, game: Game, additional_data: dict):
def run(self, game: Game, additional_data: dict):
"""Handle errors (retry, ignore or raise) that occur in the manager logic"""
# Keep track of the number of tries
@@ -106,7 +106,7 @@ class Manager(ErrorProducer):
def try_manager_logic():
try:
self.manager_logic(game, additional_data)
self.main(game, additional_data)
except Exception as error: # pylint: disable=broad-exception-caught
handle_error(error)
@@ -116,5 +116,5 @@ class Manager(ErrorProducer):
self, game: Game, additional_data: dict, callback: Callable[["Manager"], Any]
) -> None:
"""Pass the game through the manager"""
self.execute_resilient_manager_logic(game, additional_data)
self.run(game, additional_data)
callback(self)

View File

@@ -1,126 +0,0 @@
# online_cover_manager.py
#
# Copyright 2023 Geoffrey Coulaud
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# SPDX-License-Identifier: GPL-3.0-or-later
import logging
from pathlib import Path
import requests
from gi.repository import Gio, GdkPixbuf
from requests.exceptions import HTTPError, SSLError
from PIL import Image
from src import shared
from src.game import Game
from src.store.managers.local_cover_manager import LocalCoverManager
from src.store.managers.manager import Manager
from src.utils.save_cover import resize_cover, save_cover
class OnlineCoverManager(Manager):
"""Manager that downloads game covers from URLs"""
run_after = (LocalCoverManager,)
retryable_on = (HTTPError, SSLError, ConnectionError)
def save_composited_cover(
self,
game: Game,
image_file: Gio.File,
original_width: int,
original_height: int,
target_width: int,
target_height: int,
) -> None:
"""Save the image composited with a background blur to fit the cover size"""
logging.debug(
"Compositing image for %s (%s) %dx%d -> %dx%d",
game.name,
game.game_id,
original_width,
original_height,
target_width,
target_height,
)
# Load game image
image = GdkPixbuf.Pixbuf.new_from_stream(image_file.read())
# Create background blur of the size of the cover
cover = image.scale_simple(2, 2, GdkPixbuf.InterpType.BILINEAR).scale_simple(
target_width, target_height, GdkPixbuf.InterpType.BILINEAR
)
# Center the image above the blurred background
scale = min(target_width / original_width, target_height / original_height)
left_padding = (target_width - original_width * scale) / 2
top_padding = (target_height - original_height * scale) / 2
image.composite(
cover,
# Top left of overwritten area on the destination
left_padding,
top_padding,
# Size of the overwritten area on the destination
original_width * scale,
original_height * scale,
# Offset
left_padding,
top_padding,
# Scale to apply to the resized image
scale,
scale,
# Compositing stuff
GdkPixbuf.InterpType.BILINEAR,
255,
)
# Resize and save the cover
save_cover(game.game_id, resize_cover(pixbuf=cover))
def manager_logic(self, game: Game, additional_data: dict) -> None:
# Ensure that we have a cover to download
cover_url = additional_data.get("online_cover_url")
if not cover_url:
return
# Download cover
image_file = Gio.File.new_tmp()[0]
image_path = Path(image_file.get_path())
with requests.get(cover_url, timeout=5) as cover:
cover.raise_for_status()
image_path.write_bytes(cover.content)
# Get image size
cover_width, cover_height = shared.image_size
with Image.open(image_path) as pil_image:
width, height = pil_image.size
# Composite if the image is shorter and the stretch amount is too high
aspect_ratio = width / height
target_aspect_ratio = cover_width / cover_height
is_taller = aspect_ratio < target_aspect_ratio
resized_height = height / width * cover_width
stretch = 1 - (resized_height / cover_height)
max_stretch = 0.12
if is_taller or stretch <= max_stretch:
save_cover(game.game_id, resize_cover(image_path))
else:
self.save_composited_cover(
game, image_file, width, height, cover_width, cover_height
)

View File

@@ -24,19 +24,18 @@ from requests.exceptions import HTTPError, SSLError
from src.errors.friendly_error import FriendlyError
from src.game import Game
from src.store.managers.async_manager import AsyncManager
from src.store.managers.local_cover_manager import LocalCoverManager
from src.store.managers.online_cover_manager import OnlineCoverManager
from src.store.managers.steam_api_manager import SteamAPIManager
from src.store.managers.cover_manager import CoverManager
from src.utils.steamgriddb import SGDBAuthError, SGDBHelper
class SGDBManager(AsyncManager):
"""Manager in charge of downloading a game's cover from steamgriddb"""
run_after = (SteamAPIManager, LocalCoverManager, OnlineCoverManager)
run_after = (SteamAPIManager, CoverManager)
retryable_on = (HTTPError, SSLError, ConnectionError, JSONDecodeError)
def manager_logic(self, game: Game, _additional_data: dict) -> None:
def main(self, game: Game, _additional_data: dict) -> None:
try:
sgdb = SGDBHelper()
sgdb.conditionaly_update_cover(game)

View File

@@ -18,6 +18,7 @@
# SPDX-License-Identifier: GPL-3.0-or-later
from requests.exceptions import HTTPError, SSLError
from urllib3.exceptions import ConnectionError as Urllib3ConnectionError
from src.game import Game
from src.store.managers.async_manager import AsyncManager
@@ -32,7 +33,7 @@ from src.utils.steam import (
class SteamAPIManager(AsyncManager):
"""Manager in charge of completing a game's data from the Steam API"""
retryable_on = (HTTPError, SSLError, ConnectionError)
retryable_on = (HTTPError, SSLError, Urllib3ConnectionError)
steam_api_helper: SteamAPIHelper = None
steam_rate_limiter: SteamRateLimiter = None
@@ -42,7 +43,7 @@ class SteamAPIManager(AsyncManager):
self.steam_rate_limiter = SteamRateLimiter()
self.steam_api_helper = SteamAPIHelper(self.steam_rate_limiter)
def manager_logic(self, game: Game, additional_data: dict) -> None:
def main(self, game: Game, additional_data: dict) -> None:
# Skip non-steam games
appid = additional_data.get("steam_appid", None)
if appid is None:

View File

@@ -130,7 +130,7 @@ class Store:
# Connect signals
for manager in self.managers.values():
for signal in manager.signals:
game.connect(signal, manager.execute_resilient_manager_logic)
game.connect(signal, manager.run)
# Add the game to the store
if not game.source in self.source_games:

View File

@@ -103,11 +103,11 @@ class SGDBHelper:
image_trunk = shared.covers_dir / game.game_id
still = image_trunk.with_suffix(".tiff")
uri_kwargs = image_trunk.with_suffix(".gif")
animated = image_trunk.with_suffix(".gif")
prefer_sgdb = shared.schema.get_boolean("sgdb-prefer")
# Do nothing if file present and not prefer SGDB
if not prefer_sgdb and (still.is_file() or uri_kwargs.is_file()):
if not prefer_sgdb and (still.is_file() or animated.is_file()):
return
# Get ID for the game