🚧 More work on Steam source

This commit is contained in:
GeoffreyCoulaud
2023-05-21 00:34:52 +02:00
parent 4bc35383ae
commit 8587c80394
3 changed files with 118 additions and 56 deletions

View File

@@ -17,7 +17,7 @@
<default>"~/.steam/"</default>
</key>
<key name="steam-flatpak-location" type="s">
<default>~/.var/app/com.valvesoftware.Steam/data/Steam/</default>
<default>"~/.var/app/com.valvesoftware.Steam/data/Steam/"</default>
</key>
<key name="steam-windows-location" type="s">
<default>"C:\Program Files (x86)\Steam"</default>

View File

@@ -1,32 +1,32 @@
import re
import logging
from time import time
from abc import abstractmethod
from pathlib import Path
from time import time
from typing import Iterator
import requests
from requests import HTTPError, JSONDecodeError
from src.game import Game
from src.importer.source import Source, SourceIterator
from src.utils.decorators import (
replaced_by_env_path,
replaced_by_path,
replaced_by_schema_key,
replaced_by_env_path,
)
from src.utils.save_cover import resize_cover, save_cover
class SteamAPIError(Exception):
pass
from src.utils.steam import (
SteamGameNotFoundError,
SteamHelper,
SteamInvalidManifestError,
SteamNotAGameError,
)
class SteamSourceIterator(SourceIterator):
source: "SteamSource"
manifests = None
manifests_iterator = None
installed_state_mask = 4
manifests: set = None
manifests_iterator: Iterator[Path] = None
installed_state_mask: int = 4
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@@ -60,36 +60,28 @@ class SteamSourceIterator(SourceIterator):
return len(self.manifests)
def __next__(self):
"""Produce games"""
# Get metadata from manifest
# Ignore manifests that don't have a value for all keys
manifest = next(self.manifests_iterator)
manifest_data = {"name": None, "appid": None, "StateFlags": "0"}
manifest_path = next(self.manifests_iterator)
steam = SteamHelper()
try:
with open(manifest) as file:
contents = file.read()
for key in manifest_data:
regex = f'"{key}"\s+"(.*)"\n'
if (match := re.search(regex, contents)) is None:
return None
manifest_data[key] = match.group(1)
except OSError:
local_data = steam.get_manifest_data(manifest_path)
except (OSError, SteamInvalidManifestError):
return None
# Skip non installed games
if not int(manifest_data["StateFlags"]) & self.installed_state_mask:
if not int(local_data["StateFlags"]) & self.installed_state_mask:
return None
# Build basic game
appid = manifest_data["appid"]
# Build game from local data
appid = local_data["appid"]
values = {
"added": int(time()),
"name": manifest_data["name"],
"hidden": False,
"name": local_data["name"],
"source": self.source.id,
"game_id": self.source.game_id_format.format(game_id=appid),
"executable": self.source.executable_format.format(game_id=appid),
"blacklisted": False,
"developer": None,
}
game = Game(self.source.win, values, allow_side_effects=False)
@@ -101,39 +93,31 @@ class SteamSourceIterator(SourceIterator):
/ f"{appid}_library_600x900.jpg"
)
if cover_path.is_file():
save_cover(self.win, game.game_id, resize_cover(self.win, cover_path))
# Make Steam API call
try:
with requests.get(
"https://store.steampowered.com/api/appdetails?appids=%s"
% manifest_data["appid"],
timeout=5,
) as response:
response.raise_for_status()
steam_api_data = response.json()[appid]
except (HTTPError, JSONDecodeError) as error:
logging.warning(
"Error while querying Steam API for %s (%s)",
manifest_data["name"],
manifest_data["appid"],
exc_info=error,
save_cover(
self.source.win, game.game_id, resize_cover(self.source.win, cover_path)
)
return game
# Fill out new values
if not steam_api_data["success"] or steam_api_data["data"]["type"] != "game":
values["blacklisted"] = True
# Get online metadata
# TODO move to its own manager
try:
online_data = steam.get_api_data(appid=appid)
except (HTTPError, JSONDecodeError, SteamGameNotFoundError):
pass
except SteamNotAGameError:
game.update_values({"blacklisted": True})
else:
values["developer"] = ", ".join(steam_api_data["data"]["developers"])
game.update_values(values)
game.update_values(online_data)
return game
class SteamSource(Source):
name = "Steam"
executable_format = "xdg-open steam://rungameid/{game_id}"
location = None
@property
@abstractmethod
def location(self) -> Path:
pass
@property
def is_installed(self):

78
src/utils/steam.py Normal file
View File

@@ -0,0 +1,78 @@
import re
import logging
from typing import TypedDict
import requests
from requests import HTTPError, JSONDecodeError
class SteamError(Exception):
pass
class SteamGameNotFoundError(SteamError):
pass
class SteamNotAGameError(SteamError):
pass
class SteamInvalidManifestError(SteamError):
pass
class SteamManifestData(TypedDict):
name: str
appid: str
StateFlags: str
class SteamAPIData(TypedDict):
developers: str
class SteamHelper:
"""Helper around the Steam API"""
base_url = "https://store.steampowered.com/api"
def get_manifest_data(self, manifest_path) -> SteamManifestData:
"""Get local data for a game from its manifest"""
with open(manifest_path) as file:
contents = file.read()
data = {}
for key in SteamManifestData.__required_keys__:
regex = f'"{key}"\s+"(.*)"\n'
if (match := re.search(regex, contents)) is None:
raise SteamInvalidManifestError()
data[key] = match.group(1)
return SteamManifestData(**data)
def get_api_data(self, appid) -> SteamAPIData:
"""Get online data for a game from its appid"""
try:
with requests.get(
f"{self.base_url}/appdetails?appids={appid}", timeout=5
) as response:
response.raise_for_status()
data = response.json()[appid]
except (HTTPError, JSONDecodeError) as error:
logging.warning("Error while querying Steam API for %s", appid)
raise error
if not data["success"]:
raise SteamGameNotFoundError()
if data["data"]["type"] != "game":
raise SteamNotAGameError()
# Return API values we're interested in
values = SteamAPIData(developers=", ".join(data["data"]["developers"]))
return values