🎨 No longer using SGDBSave

- Details window uses a Pipeline with SGDBTask
- Store saves managers in a type: instance dict
- Removed SGDBSave
This commit is contained in:
GeoffreyCoulaud
2023-06-10 16:22:09 +02:00
parent 3a0911e742
commit dcd4357e57
3 changed files with 43 additions and 150 deletions

View File

@@ -17,21 +17,21 @@
#
# SPDX-License-Identifier: GPL-3.0-or-later
import logging
import os
import shlex
from time import time
from gi.repository import Adw, Gio, GLib, Gtk
from PIL import Image
from requests.exceptions import HTTPError, SSLError
from src import shared
from src.game import Game
from src.game_cover import GameCover
from src.store.managers.sgdb_manager import SGDBManager
from src.store.pipeline import Pipeline
from src.utils.create_dialog import create_dialog
from src.utils.save_cover import resize_cover, save_cover
from src.utils.steamgriddb import SGDBError, SGDBHelper
from src.utils.steamgriddb import SGDBAuthError
@Gtk.Template(resource_path=shared.PREFIX + "/gtk/details_window.ui")
@@ -205,24 +205,48 @@ class DetailsWindow(Adw.Window):
self.game.save()
self.game.update()
# Try to get a cover if none is present
# TODO inform the user
# TODO wrap in a task and mark loading
# Get a cover from SGDB if none is present
if not self.game_cover.get_pixbuf():
print("test 1212")
sgdb = SGDBHelper()
try:
sgdb.conditionaly_update_cover(self.game)
except SGDBError as error:
logging.error("Could not update cover", exc_info=error)
except (HTTPError, SSLError, ConnectionError):
logging.warning("Could not connect to SteamGridDB")
self.game.set_loading(1)
sgdb_manager: SGDBManager = shared.store.managers[SGDBManager]
sgdb_manager.reset_cancellable()
pipeline = Pipeline(self.game, {}, (sgdb_manager,))
pipeline.connect("advanced", self.update_cover_callback)
pipeline.advance()
self.game_cover.pictures.remove(self.cover)
self.close()
self.win.show_details_view(self.game)
def update_cover_callback(self, pipeline: Pipeline):
# Check that managers are done
if not pipeline.is_done:
return
# Set the game as not loading
self.game.set_loading(-1)
self.game.update()
# Handle errors that occured
errors = []
for manager in pipeline.done:
errors.extend(manager.collect_errors())
for error in errors:
# On auth error, inform the user
if isinstance(error, SGDBAuthError):
create_dialog(
shared.win,
_("Couldn't Connect to SteamGridDB"),
str(error),
"open_preferences",
_("Preferences"),
).connect("response", self.update_cover_error_response)
def update_cover_error_response(self, _widget, response):
if response == "open_preferences":
shared.win.get_application().on_preferences_action(page_name="sgdb")
def focus_executable(self, *_args):
self.set_focus(self.executable)

View File

@@ -7,18 +7,18 @@ from src.store.pipeline import Pipeline
class Store:
"""Class in charge of handling games being added to the app."""
managers: set[Manager]
managers: dict[type[Manager], Manager]
pipelines: dict[str, Pipeline]
games: dict[str, Game]
def __init__(self) -> None:
self.managers = set()
self.managers = {}
self.games = {}
self.pipelines = {}
def add_manager(self, manager: Manager):
"""Add a manager class that will run when games are added"""
self.managers.add(manager)
"""Add a manager that will run when games are added"""
self.managers[type(manager)] = manager
def add_game(
self, game: Game, additional_data: dict, replace=False
@@ -51,7 +51,7 @@ class Store:
return None
# Run the pipeline for the game
pipeline = Pipeline(game, additional_data, self.managers)
pipeline = Pipeline(game, additional_data, self.managers.values())
self.games[game.game_id] = game
self.pipelines[game.game_id] = pipeline
pipeline.advance()

View File

@@ -6,7 +6,6 @@ from gi.repository import Gio
from requests.exceptions import HTTPError
from src import shared
from src.utils.create_dialog import create_dialog
from src.utils.save_cover import resize_cover, save_cover
@@ -136,133 +135,3 @@ class SGDBHelper:
sgdb_id,
)
raise SGDBNoImageFoundError()
# Current steps to save image for N games
# Create a task for every game
# Call update_cover
# If using sgdb and (prefer or no image) and not blacklisted
# Search for game
# Get image from sgdb (animated if preferred and found, or still)
# Exit task and enter task_done
# If error, create popup
class SGDBSave:
def __init__(self, games, importer=None):
self.win = shared.win
self.importer = importer
self.exception = None
# Wrap the function in another one as Gio.Task.run_in_thread does not allow for passing args
def create_func(game):
def wrapper(task, *_args):
self.update_cover(
task,
game,
)
return wrapper
for game in games:
Gio.Task.new(None, None, self.task_done).run_in_thread(create_func(game))
def update_cover(self, task, game):
game.set_loading(1)
if (
not (
shared.schema.get_boolean("sgdb")
and (
(shared.schema.get_boolean("sgdb-prefer"))
or not (
(shared.covers_dir / f"{game.game_id}.gif").is_file()
or (shared.covers_dir / f"{game.game_id}.tiff").is_file()
)
)
)
or game.blacklisted
):
task.return_value(game)
return
url = "https://www.steamgriddb.com/api/v2/"
headers = {"Authorization": f'Bearer {shared.schema.get_string("sgdb-key")}'}
try:
search_result = requests.get(
f"{url}search/autocomplete/{game.name}",
headers=headers,
timeout=5,
)
if search_result.status_code != 200:
self.exception = str(
search_result.json()["errors"][0]
if "errors" in tuple(search_result.json())
else search_result.status_code
)
search_result.raise_for_status()
except requests.exceptions.RequestException:
task.return_value(game)
return
response = None
try:
if shared.schema.get_boolean("sgdb-animated"):
try:
grid = requests.get(
f'{url}grids/game/{search_result.json()["data"][0]["id"]}?dimensions=600x900&types=animated',
headers=headers,
timeout=5,
)
response = requests.get(grid.json()["data"][0]["url"], timeout=5)
except IndexError:
pass
if not response:
grid = requests.get(
f'{url}grids/game/{search_result.json()["data"][0]["id"]}?dimensions=600x900',
headers=headers,
timeout=5,
)
response = requests.get(grid.json()["data"][0]["url"], timeout=5)
except (requests.exceptions.RequestException, IndexError):
task.return_value(game)
return
tmp_file = Gio.File.new_tmp()[0]
Path(tmp_file.get_path()).write_bytes(response.content)
save_cover(
game.game_id,
resize_cover(tmp_file.get_path()),
)
task.return_value(game)
def task_done(self, _task, result):
if self.importer:
self.importer.queue -= 1
self.importer.done()
self.importer.sgdb_exception = self.exception
if self.exception and not self.importer:
create_dialog(
self.win,
_("Couldn't Connect to SteamGridDB"),
self.exception,
"open_preferences",
_("Preferences"),
).connect("response", self.response)
game = result.propagate_value()[1]
game.set_loading(-1)
if self.importer:
game.save()
else:
game.update()
def response(self, _widget, response):
if response == "open_preferences":
self.win.get_application().on_preferences_action(page_name="sgdb")