diff --git a/src/importer/sources/bottles_source.py b/src/importer/sources/bottles_source.py index 34d93cd..8e84869 100644 --- a/src/importer/sources/bottles_source.py +++ b/src/importer/sources/bottles_source.py @@ -17,11 +17,6 @@ from src.utils.save_cover import resize_cover, save_cover class BottlesSourceIterator(SourceIterator): source: "BottlesSource" - generator: Generator = None - - def __init__(self, *args, **kwargs) -> None: - super().__init__(*args, **kwargs) - self.generator = self.generator_builder() def generator_builder(self) -> Optional[Game]: """Generator method producing games""" @@ -55,13 +50,6 @@ class BottlesSourceIterator(SourceIterator): # Produce game yield game - def __next__(self) -> Optional[Game]: - try: - game = next(self.generator) - except StopIteration: - raise - return game - class BottlesSource(Source): """Generic Bottles source""" diff --git a/src/importer/sources/heroic_source.py b/src/importer/sources/heroic_source.py index 73d6d22..1e2259d 100644 --- a/src/importer/sources/heroic_source.py +++ b/src/importer/sources/heroic_source.py @@ -38,7 +38,7 @@ class HeroicSubSource(TypedDict): class HeroicSourceIterator(SourceIterator): source: "HeroicSource" - generator: Generator = None + sub_sources: dict[str, HeroicSubSource] = { "sideload": { "service": "sideload", @@ -89,9 +89,10 @@ class HeroicSourceIterator(SourceIterator): return Game(values, allow_side_effects=False) - def sub_sources_generator(self): + def generator_builder(self): """Generator method producing games from all the Heroic sub-sources""" - for _key, sub_source in self.sub_sources.items(): + + for sub_source in self.sub_sources.values(): # Skip disabled sub-sources if not shared.schema.get_boolean("heroic-import-" + sub_source["service"]): continue @@ -112,17 +113,6 @@ class HeroicSourceIterator(SourceIterator): continue yield game - def __init__(self, *args, **kwargs) -> None: - super().__init__(*args, **kwargs) - self.generator = self.sub_sources_generator() - - def __next__(self) -> Optional[Game]: - try: - game = next(self.generator) - except StopIteration: - raise - return game - class HeroicSource(Source): """Generic heroic games launcher source""" diff --git a/src/importer/sources/lutris_source.py b/src/importer/sources/lutris_source.py index 6ec7cf2..7bb662b 100644 --- a/src/importer/sources/lutris_source.py +++ b/src/importer/sources/lutris_source.py @@ -1,6 +1,6 @@ from sqlite3 import connect from time import time -from typing import Optional +from typing import Optional, Generator from src import shared from src.game import Game @@ -11,62 +11,50 @@ from src.utils.save_cover import resize_cover, save_cover class LutrisSourceIterator(SourceIterator): source: "LutrisSource" - import_steam = False - db_connection = None - db_cursor = None - db_location = None - db_games_request = """ - SELECT id, name, slug, runner, hidden - FROM 'games' - WHERE - name IS NOT NULL - AND slug IS NOT NULL - AND configPath IS NOT NULL - AND installed - AND (runner IS NOT "steam" OR :import_steam) - ; - """ - db_request_params = None - def __init__(self, *args, **kwargs) -> None: - super().__init__(*args, **kwargs) - self.import_steam = shared.schema.get_boolean("lutris-import-steam") - self.db_location = self.source.location / "pga.db" - self.db_connection = connect(self.db_location) - self.db_request_params = {"import_steam": self.import_steam} - self.db_cursor = self.db_connection.execute( - self.db_games_request, self.db_request_params - ) + def generator_builder(self) -> Optional[Game]: + """Generator method producing games""" - def __next__(self) -> Optional[Game]: - row = None - try: - row = self.db_cursor.__next__() - except StopIteration as error: - self.db_connection.close() - raise error + # Query the database + request = """ + SELECT id, name, slug, runner, hidden + FROM 'games' + WHERE + name IS NOT NULL + AND slug IS NOT NULL + AND configPath IS NOT NULL + AND installed + AND (runner IS NOT "steam" OR :import_steam) + ; + """ + params = {"import_steam": shared.schema.get_boolean("lutris-import-steam")} + connection = connect(self.source.location / "pga.db") + cursor = connection.execute(request, params) - # Create game - values = { - "version": shared.SPEC_VERSION, - "added": int(time()), - "hidden": row[4], - "name": row[1], - "source": f"{self.source.id}_{row[3]}", - "game_id": self.source.game_id_format.format( - game_id=row[2], game_internal_id=row[0] - ), - "executable": self.source.executable_format.format(game_id=row[2]), - "developer": None, # TODO get developer metadata on Lutris - } - game = Game(values, allow_side_effects=False) + # Create games from the DB results + for row in cursor: + # Create game + values = { + "version": shared.SPEC_VERSION, + "added": int(time()), + "hidden": row[4], + "name": row[1], + "source": f"{self.source.id}_{row[3]}", + "game_id": self.source.game_id_format.format( + game_id=row[2], game_internal_id=row[0] + ), + "executable": self.source.executable_format.format(game_id=row[2]), + "developer": None, # TODO get developer metadata on Lutris + } + game = Game(values, allow_side_effects=False) - # Save official image - image_path = self.source.location / "covers" / "coverart" / f"{row[2]}.jpg" - if image_path.exists(): - save_cover(values["game_id"], resize_cover(image_path)) + # Save official image + image_path = self.source.location / "covers" / "coverart" / f"{row[2]}.jpg" + if image_path.exists(): + save_cover(values["game_id"], resize_cover(image_path)) - return game + # Produce game + yield game class LutrisSource(Source): diff --git a/src/importer/sources/source.py b/src/importer/sources/source.py index 44e2290..f0da552 100644 --- a/src/importer/sources/source.py +++ b/src/importer/sources/source.py @@ -2,7 +2,7 @@ import sys from abc import abstractmethod from collections.abc import Iterable, Iterator from pathlib import Path -from typing import Optional +from typing import Optional, Generator from src.game import Game @@ -11,20 +11,28 @@ class SourceIterator(Iterator): """Data producer for a source of games""" source: "Source" = None + generator: Generator = None def __init__(self, source: "Source") -> None: super().__init__() self.source = source + self.generator = self.generator_builder() def __iter__(self) -> "SourceIterator": return self - @abstractmethod def __next__(self) -> Optional[Game]: - """Get the next generated game from the source. - Raises StopIteration when exhausted. - May raise any other exception signifying an error on this specific game. - May return None when a game has been skipped without an error.""" + return next(self.generator) + + @abstractmethod + def generator_builder(self) -> Generator[Optional[Game], None, None]: + """ + Method that returns a generator that produces games + * Should be implemented as a generator method + * May yield `None` when an iteration hasn't produced a game + * In charge of handling per-game errors + * Returns when exhausted + """ class Source(Iterable): diff --git a/src/importer/sources/steam_source.py b/src/importer/sources/steam_source.py index be062cf..5e59252 100644 --- a/src/importer/sources/steam_source.py +++ b/src/importer/sources/steam_source.py @@ -1,7 +1,7 @@ import re from pathlib import Path from time import time -from typing import Iterator, Optional +from typing import Iterable, Optional, Generator from src import shared from src.game import Game @@ -22,81 +22,78 @@ from src.utils.steam import SteamHelper, SteamInvalidManifestError class SteamSourceIterator(SourceIterator): source: "SteamSource" - manifests: set = None - manifests_iterator: Iterator[Path] = None - installed_state_mask: int = 4 - appid_cache: set = None - def __init__(self, *args, **kwargs) -> None: - super().__init__(*args, **kwargs) - - self.appid_cache = set() - self.manifests = set() - - # Get dirs that contain steam app manifests + def get_manifest_dirs(self) -> Iterable[Path]: + """Get dirs that contain steam app manifests""" libraryfolders_path = self.source.location / "steamapps" / "libraryfolders.vdf" with open(libraryfolders_path, "r") as file: contents = file.read() - steamapps_dirs = [ + return [ Path(path) / "steamapps" for path in re.findall('"path"\s+"(.*)"\n', contents, re.IGNORECASE) ] - # Get app manifests - for steamapps_dir in steamapps_dirs: + def get_manifests(self) -> Iterable[Path]: + """Get app manifests""" + manifests = set() + for steamapps_dir in self.get_manifest_dirs(): if not steamapps_dir.is_dir(): continue - self.manifests.update( + manifests.update( [ manifest for manifest in steamapps_dir.glob("appmanifest_*.acf") if manifest.is_file() ] ) + return manifests - self.manifests_iterator = iter(self.manifests) + def generator_builder(self) -> Optional[Game]: + """Generator method producing games""" + appid_cache = set() + manifests = self.get_manifests() + for manifest in manifests: + # Get metadata from manifest + steam = SteamHelper() + try: + local_data = steam.get_manifest_data(manifest) + except (OSError, SteamInvalidManifestError): + continue - def __next__(self) -> Optional[Game]: - # Get metadata from manifest - manifest_path = next(self.manifests_iterator) - steam = SteamHelper() - try: - local_data = steam.get_manifest_data(manifest_path) - except (OSError, SteamInvalidManifestError): - return None + # Skip non installed games + INSTALLED_MASK: int = 4 + if not int(local_data["stateflags"]) & INSTALLED_MASK: + continue - # Skip non installed games - if not int(local_data["stateflags"]) & self.installed_state_mask: - return None + # Skip duplicate appids + appid = local_data["appid"] + if appid in appid_cache: + continue + appid_cache.add(appid) - # Skip duplicate appids - appid = local_data["appid"] - if appid in self.appid_cache: - return None - self.appid_cache.add(appid) + # Build game from local data + values = { + "version": shared.SPEC_VERSION, + "added": int(time()), + "name": local_data["name"], + "source": self.source.id, + "game_id": self.source.game_id_format.format(game_id=appid), + "executable": self.source.executable_format.format(game_id=appid), + } + game = Game(values, allow_side_effects=False) - # Build game from local data - values = { - "version": shared.SPEC_VERSION, - "added": int(time()), - "name": local_data["name"], - "source": self.source.id, - "game_id": self.source.game_id_format.format(game_id=appid), - "executable": self.source.executable_format.format(game_id=appid), - } - game = Game(values, allow_side_effects=False) + # Add official cover image + image_path = ( + self.source.location + / "appcache" + / "librarycache" + / f"{appid}_library_600x900.jpg" + ) + if image_path.is_file(): + save_cover(game.game_id, resize_cover(image_path)) - # Add official cover image - cover_path = ( - self.source.location - / "appcache" - / "librarycache" - / f"{appid}_library_600x900.jpg" - ) - if cover_path.is_file(): - save_cover(game.game_id, resize_cover(cover_path)) - - return game + # Produce game + yield game class SteamSource(Source):