🎨 Simplified SourceIterator-s
- Using generator functions - Common generator init and next in base class - Explicited that error handling should happen in generator
This commit is contained in:
@@ -17,11 +17,6 @@ from src.utils.save_cover import resize_cover, save_cover
|
||||
|
||||
class BottlesSourceIterator(SourceIterator):
|
||||
source: "BottlesSource"
|
||||
generator: Generator = None
|
||||
|
||||
def __init__(self, *args, **kwargs) -> None:
|
||||
super().__init__(*args, **kwargs)
|
||||
self.generator = self.generator_builder()
|
||||
|
||||
def generator_builder(self) -> Optional[Game]:
|
||||
"""Generator method producing games"""
|
||||
@@ -55,13 +50,6 @@ class BottlesSourceIterator(SourceIterator):
|
||||
# Produce game
|
||||
yield game
|
||||
|
||||
def __next__(self) -> Optional[Game]:
|
||||
try:
|
||||
game = next(self.generator)
|
||||
except StopIteration:
|
||||
raise
|
||||
return game
|
||||
|
||||
|
||||
class BottlesSource(Source):
|
||||
"""Generic Bottles source"""
|
||||
|
||||
@@ -38,7 +38,7 @@ class HeroicSubSource(TypedDict):
|
||||
|
||||
class HeroicSourceIterator(SourceIterator):
|
||||
source: "HeroicSource"
|
||||
generator: Generator = None
|
||||
|
||||
sub_sources: dict[str, HeroicSubSource] = {
|
||||
"sideload": {
|
||||
"service": "sideload",
|
||||
@@ -89,9 +89,10 @@ class HeroicSourceIterator(SourceIterator):
|
||||
|
||||
return Game(values, allow_side_effects=False)
|
||||
|
||||
def sub_sources_generator(self):
|
||||
def generator_builder(self):
|
||||
"""Generator method producing games from all the Heroic sub-sources"""
|
||||
for _key, sub_source in self.sub_sources.items():
|
||||
|
||||
for sub_source in self.sub_sources.values():
|
||||
# Skip disabled sub-sources
|
||||
if not shared.schema.get_boolean("heroic-import-" + sub_source["service"]):
|
||||
continue
|
||||
@@ -112,17 +113,6 @@ class HeroicSourceIterator(SourceIterator):
|
||||
continue
|
||||
yield game
|
||||
|
||||
def __init__(self, *args, **kwargs) -> None:
|
||||
super().__init__(*args, **kwargs)
|
||||
self.generator = self.sub_sources_generator()
|
||||
|
||||
def __next__(self) -> Optional[Game]:
|
||||
try:
|
||||
game = next(self.generator)
|
||||
except StopIteration:
|
||||
raise
|
||||
return game
|
||||
|
||||
|
||||
class HeroicSource(Source):
|
||||
"""Generic heroic games launcher source"""
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
from sqlite3 import connect
|
||||
from time import time
|
||||
from typing import Optional
|
||||
from typing import Optional, Generator
|
||||
|
||||
from src import shared
|
||||
from src.game import Game
|
||||
@@ -11,62 +11,50 @@ from src.utils.save_cover import resize_cover, save_cover
|
||||
|
||||
class LutrisSourceIterator(SourceIterator):
|
||||
source: "LutrisSource"
|
||||
import_steam = False
|
||||
db_connection = None
|
||||
db_cursor = None
|
||||
db_location = None
|
||||
db_games_request = """
|
||||
SELECT id, name, slug, runner, hidden
|
||||
FROM 'games'
|
||||
WHERE
|
||||
name IS NOT NULL
|
||||
AND slug IS NOT NULL
|
||||
AND configPath IS NOT NULL
|
||||
AND installed
|
||||
AND (runner IS NOT "steam" OR :import_steam)
|
||||
;
|
||||
"""
|
||||
db_request_params = None
|
||||
|
||||
def __init__(self, *args, **kwargs) -> None:
|
||||
super().__init__(*args, **kwargs)
|
||||
self.import_steam = shared.schema.get_boolean("lutris-import-steam")
|
||||
self.db_location = self.source.location / "pga.db"
|
||||
self.db_connection = connect(self.db_location)
|
||||
self.db_request_params = {"import_steam": self.import_steam}
|
||||
self.db_cursor = self.db_connection.execute(
|
||||
self.db_games_request, self.db_request_params
|
||||
)
|
||||
def generator_builder(self) -> Optional[Game]:
|
||||
"""Generator method producing games"""
|
||||
|
||||
def __next__(self) -> Optional[Game]:
|
||||
row = None
|
||||
try:
|
||||
row = self.db_cursor.__next__()
|
||||
except StopIteration as error:
|
||||
self.db_connection.close()
|
||||
raise error
|
||||
# Query the database
|
||||
request = """
|
||||
SELECT id, name, slug, runner, hidden
|
||||
FROM 'games'
|
||||
WHERE
|
||||
name IS NOT NULL
|
||||
AND slug IS NOT NULL
|
||||
AND configPath IS NOT NULL
|
||||
AND installed
|
||||
AND (runner IS NOT "steam" OR :import_steam)
|
||||
;
|
||||
"""
|
||||
params = {"import_steam": shared.schema.get_boolean("lutris-import-steam")}
|
||||
connection = connect(self.source.location / "pga.db")
|
||||
cursor = connection.execute(request, params)
|
||||
|
||||
# Create game
|
||||
values = {
|
||||
"version": shared.SPEC_VERSION,
|
||||
"added": int(time()),
|
||||
"hidden": row[4],
|
||||
"name": row[1],
|
||||
"source": f"{self.source.id}_{row[3]}",
|
||||
"game_id": self.source.game_id_format.format(
|
||||
game_id=row[2], game_internal_id=row[0]
|
||||
),
|
||||
"executable": self.source.executable_format.format(game_id=row[2]),
|
||||
"developer": None, # TODO get developer metadata on Lutris
|
||||
}
|
||||
game = Game(values, allow_side_effects=False)
|
||||
# Create games from the DB results
|
||||
for row in cursor:
|
||||
# Create game
|
||||
values = {
|
||||
"version": shared.SPEC_VERSION,
|
||||
"added": int(time()),
|
||||
"hidden": row[4],
|
||||
"name": row[1],
|
||||
"source": f"{self.source.id}_{row[3]}",
|
||||
"game_id": self.source.game_id_format.format(
|
||||
game_id=row[2], game_internal_id=row[0]
|
||||
),
|
||||
"executable": self.source.executable_format.format(game_id=row[2]),
|
||||
"developer": None, # TODO get developer metadata on Lutris
|
||||
}
|
||||
game = Game(values, allow_side_effects=False)
|
||||
|
||||
# Save official image
|
||||
image_path = self.source.location / "covers" / "coverart" / f"{row[2]}.jpg"
|
||||
if image_path.exists():
|
||||
save_cover(values["game_id"], resize_cover(image_path))
|
||||
# Save official image
|
||||
image_path = self.source.location / "covers" / "coverart" / f"{row[2]}.jpg"
|
||||
if image_path.exists():
|
||||
save_cover(values["game_id"], resize_cover(image_path))
|
||||
|
||||
return game
|
||||
# Produce game
|
||||
yield game
|
||||
|
||||
|
||||
class LutrisSource(Source):
|
||||
|
||||
@@ -2,7 +2,7 @@ import sys
|
||||
from abc import abstractmethod
|
||||
from collections.abc import Iterable, Iterator
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
from typing import Optional, Generator
|
||||
|
||||
from src.game import Game
|
||||
|
||||
@@ -11,20 +11,28 @@ class SourceIterator(Iterator):
|
||||
"""Data producer for a source of games"""
|
||||
|
||||
source: "Source" = None
|
||||
generator: Generator = None
|
||||
|
||||
def __init__(self, source: "Source") -> None:
|
||||
super().__init__()
|
||||
self.source = source
|
||||
self.generator = self.generator_builder()
|
||||
|
||||
def __iter__(self) -> "SourceIterator":
|
||||
return self
|
||||
|
||||
@abstractmethod
|
||||
def __next__(self) -> Optional[Game]:
|
||||
"""Get the next generated game from the source.
|
||||
Raises StopIteration when exhausted.
|
||||
May raise any other exception signifying an error on this specific game.
|
||||
May return None when a game has been skipped without an error."""
|
||||
return next(self.generator)
|
||||
|
||||
@abstractmethod
|
||||
def generator_builder(self) -> Generator[Optional[Game], None, None]:
|
||||
"""
|
||||
Method that returns a generator that produces games
|
||||
* Should be implemented as a generator method
|
||||
* May yield `None` when an iteration hasn't produced a game
|
||||
* In charge of handling per-game errors
|
||||
* Returns when exhausted
|
||||
"""
|
||||
|
||||
|
||||
class Source(Iterable):
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import re
|
||||
from pathlib import Path
|
||||
from time import time
|
||||
from typing import Iterator, Optional
|
||||
from typing import Iterable, Optional, Generator
|
||||
|
||||
from src import shared
|
||||
from src.game import Game
|
||||
@@ -22,81 +22,78 @@ from src.utils.steam import SteamHelper, SteamInvalidManifestError
|
||||
|
||||
class SteamSourceIterator(SourceIterator):
|
||||
source: "SteamSource"
|
||||
manifests: set = None
|
||||
manifests_iterator: Iterator[Path] = None
|
||||
installed_state_mask: int = 4
|
||||
appid_cache: set = None
|
||||
|
||||
def __init__(self, *args, **kwargs) -> None:
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
self.appid_cache = set()
|
||||
self.manifests = set()
|
||||
|
||||
# Get dirs that contain steam app manifests
|
||||
def get_manifest_dirs(self) -> Iterable[Path]:
|
||||
"""Get dirs that contain steam app manifests"""
|
||||
libraryfolders_path = self.source.location / "steamapps" / "libraryfolders.vdf"
|
||||
with open(libraryfolders_path, "r") as file:
|
||||
contents = file.read()
|
||||
steamapps_dirs = [
|
||||
return [
|
||||
Path(path) / "steamapps"
|
||||
for path in re.findall('"path"\s+"(.*)"\n', contents, re.IGNORECASE)
|
||||
]
|
||||
|
||||
# Get app manifests
|
||||
for steamapps_dir in steamapps_dirs:
|
||||
def get_manifests(self) -> Iterable[Path]:
|
||||
"""Get app manifests"""
|
||||
manifests = set()
|
||||
for steamapps_dir in self.get_manifest_dirs():
|
||||
if not steamapps_dir.is_dir():
|
||||
continue
|
||||
self.manifests.update(
|
||||
manifests.update(
|
||||
[
|
||||
manifest
|
||||
for manifest in steamapps_dir.glob("appmanifest_*.acf")
|
||||
if manifest.is_file()
|
||||
]
|
||||
)
|
||||
return manifests
|
||||
|
||||
self.manifests_iterator = iter(self.manifests)
|
||||
def generator_builder(self) -> Optional[Game]:
|
||||
"""Generator method producing games"""
|
||||
appid_cache = set()
|
||||
manifests = self.get_manifests()
|
||||
for manifest in manifests:
|
||||
# Get metadata from manifest
|
||||
steam = SteamHelper()
|
||||
try:
|
||||
local_data = steam.get_manifest_data(manifest)
|
||||
except (OSError, SteamInvalidManifestError):
|
||||
continue
|
||||
|
||||
def __next__(self) -> Optional[Game]:
|
||||
# Get metadata from manifest
|
||||
manifest_path = next(self.manifests_iterator)
|
||||
steam = SteamHelper()
|
||||
try:
|
||||
local_data = steam.get_manifest_data(manifest_path)
|
||||
except (OSError, SteamInvalidManifestError):
|
||||
return None
|
||||
# Skip non installed games
|
||||
INSTALLED_MASK: int = 4
|
||||
if not int(local_data["stateflags"]) & INSTALLED_MASK:
|
||||
continue
|
||||
|
||||
# Skip non installed games
|
||||
if not int(local_data["stateflags"]) & self.installed_state_mask:
|
||||
return None
|
||||
# Skip duplicate appids
|
||||
appid = local_data["appid"]
|
||||
if appid in appid_cache:
|
||||
continue
|
||||
appid_cache.add(appid)
|
||||
|
||||
# Skip duplicate appids
|
||||
appid = local_data["appid"]
|
||||
if appid in self.appid_cache:
|
||||
return None
|
||||
self.appid_cache.add(appid)
|
||||
# Build game from local data
|
||||
values = {
|
||||
"version": shared.SPEC_VERSION,
|
||||
"added": int(time()),
|
||||
"name": local_data["name"],
|
||||
"source": self.source.id,
|
||||
"game_id": self.source.game_id_format.format(game_id=appid),
|
||||
"executable": self.source.executable_format.format(game_id=appid),
|
||||
}
|
||||
game = Game(values, allow_side_effects=False)
|
||||
|
||||
# Build game from local data
|
||||
values = {
|
||||
"version": shared.SPEC_VERSION,
|
||||
"added": int(time()),
|
||||
"name": local_data["name"],
|
||||
"source": self.source.id,
|
||||
"game_id": self.source.game_id_format.format(game_id=appid),
|
||||
"executable": self.source.executable_format.format(game_id=appid),
|
||||
}
|
||||
game = Game(values, allow_side_effects=False)
|
||||
# Add official cover image
|
||||
image_path = (
|
||||
self.source.location
|
||||
/ "appcache"
|
||||
/ "librarycache"
|
||||
/ f"{appid}_library_600x900.jpg"
|
||||
)
|
||||
if image_path.is_file():
|
||||
save_cover(game.game_id, resize_cover(image_path))
|
||||
|
||||
# Add official cover image
|
||||
cover_path = (
|
||||
self.source.location
|
||||
/ "appcache"
|
||||
/ "librarycache"
|
||||
/ f"{appid}_library_600x900.jpg"
|
||||
)
|
||||
if cover_path.is_file():
|
||||
save_cover(game.game_id, resize_cover(cover_path))
|
||||
|
||||
return game
|
||||
# Produce game
|
||||
yield game
|
||||
|
||||
|
||||
class SteamSource(Source):
|
||||
|
||||
Reference in New Issue
Block a user