✨ Added blocking/async managers
This commit is contained in:
41
src/store/managers/async_manager.py
Normal file
41
src/store/managers/async_manager.py
Normal file
@@ -0,0 +1,41 @@
|
|||||||
|
from gi.repository import Gio
|
||||||
|
|
||||||
|
from src.game import Game
|
||||||
|
from src.store.managers.manager import Manager
|
||||||
|
from src.utils.task import Task
|
||||||
|
|
||||||
|
|
||||||
|
class AsyncManager(Manager):
|
||||||
|
"""Manager that can run asynchronously"""
|
||||||
|
|
||||||
|
blocking = False
|
||||||
|
cancellable: Gio.Cancellable = None
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
super().__init__()
|
||||||
|
self.cancellable = Gio.Cancellable()
|
||||||
|
|
||||||
|
def cancel_tasks(self):
|
||||||
|
"""Cancel all tasks for this manager"""
|
||||||
|
self.cancellable.cancel()
|
||||||
|
|
||||||
|
def reset_cancellable(self):
|
||||||
|
"""Reset the cancellable for this manager.
|
||||||
|
Already scheduled Tasks will no longer be cancellable."""
|
||||||
|
self.cancellable = Gio.Cancellable()
|
||||||
|
|
||||||
|
def run(self, game: Game) -> None:
|
||||||
|
data = (game,)
|
||||||
|
task = Task.new(self, self.cancellable, self._task_callback, data)
|
||||||
|
task.set_task_data(data)
|
||||||
|
task.run_in_thread(self._task_thread_func)
|
||||||
|
|
||||||
|
def _task_thread_func(self, _task, _source_object, data, cancellable):
|
||||||
|
"""Task thread entry point"""
|
||||||
|
game, *_rest = data
|
||||||
|
self.emit("started")
|
||||||
|
self.final_run(game)
|
||||||
|
|
||||||
|
def _task_callback(self, _source_object, _result, _data):
|
||||||
|
"""Method run after the async task is done"""
|
||||||
|
self.emit("done")
|
||||||
@@ -9,7 +9,8 @@ class DisplayManager(Manager):
|
|||||||
|
|
||||||
run_after = set((FileManager,))
|
run_after = set((FileManager,))
|
||||||
|
|
||||||
def run(self, game: Game) -> None:
|
def final_run(self, game: Game) -> None:
|
||||||
# TODO decouple a game from its widget
|
# TODO decouple a game from its widget
|
||||||
|
# TODO make the display manager async
|
||||||
shared.win.games[game.game_id] = game
|
shared.win.games[game.game_id] = game
|
||||||
game.update()
|
game.update()
|
||||||
|
|||||||
@@ -1,12 +1,12 @@
|
|||||||
from src.game import Game
|
from src.game import Game
|
||||||
from src.store.managers.manager import Manager
|
from src.store.managers.async_manager import AsyncManager
|
||||||
from src.store.managers.steam_api_manager import SteamAPIManager
|
from src.store.managers.steam_api_manager import SteamAPIManager
|
||||||
|
|
||||||
|
|
||||||
class FileManager(Manager):
|
class FileManager(AsyncManager):
|
||||||
"""Manager in charge of saving a game to a file"""
|
"""Manager in charge of saving a game to a file"""
|
||||||
|
|
||||||
run_after = set((SteamAPIManager,))
|
run_after = set((SteamAPIManager,))
|
||||||
|
|
||||||
def run(self, game: Game) -> None:
|
def final_run(self, game: Game) -> None:
|
||||||
game.save()
|
game.save()
|
||||||
|
|||||||
@@ -1,10 +1,11 @@
|
|||||||
from abc import abstractmethod
|
from abc import abstractmethod
|
||||||
from gi.repository import Gio
|
|
||||||
|
from gi.repository import GObject
|
||||||
|
|
||||||
from src.game import Game
|
from src.game import Game
|
||||||
|
|
||||||
|
|
||||||
class Manager:
|
class Manager(GObject.Object):
|
||||||
"""Class in charge of handling a post creation action for games.
|
"""Class in charge of handling a post creation action for games.
|
||||||
|
|
||||||
* May connect to signals on the game to handle them.
|
* May connect to signals on the game to handle them.
|
||||||
@@ -13,26 +14,15 @@ class Manager:
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
run_after: set[type["Manager"]] = set()
|
run_after: set[type["Manager"]] = set()
|
||||||
|
|
||||||
cancellable: Gio.Cancellable
|
|
||||||
errors: list[Exception]
|
errors: list[Exception]
|
||||||
|
blocking: bool = True
|
||||||
|
|
||||||
def __init__(self) -> None:
|
def __init__(self) -> None:
|
||||||
super().__init__()
|
super().__init__()
|
||||||
self.cancellable = Gio.Cancellable()
|
|
||||||
self.errors = []
|
self.errors = []
|
||||||
|
|
||||||
def cancel_tasks(self):
|
|
||||||
"""Cancel all tasks for this manager"""
|
|
||||||
self.cancellable.cancel()
|
|
||||||
|
|
||||||
def reset_cancellable(self):
|
|
||||||
"""Reset the cancellable for this manager.
|
|
||||||
Alreadyn scheduled Tasks will no longer be cancellable."""
|
|
||||||
self.cancellable = Gio.Cancellable()
|
|
||||||
|
|
||||||
def report_error(self, error: Exception):
|
def report_error(self, error: Exception):
|
||||||
"""Report an error that happened in of run"""
|
"""Report an error that happened in Manager.run"""
|
||||||
self.errors.append(error)
|
self.errors.append(error)
|
||||||
|
|
||||||
def collect_errors(self) -> list[Exception]:
|
def collect_errors(self) -> list[Exception]:
|
||||||
@@ -42,8 +32,26 @@ class Manager:
|
|||||||
return errors
|
return errors
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def run(self, game: Game, cancellable: Gio.Cancellable) -> None:
|
def final_run(self, game: Game) -> None:
|
||||||
|
"""
|
||||||
|
Abstract method overriden by final child classes, called by the run method.
|
||||||
|
* May block its thread
|
||||||
|
* May not raise exceptions, as they will be silently ignored
|
||||||
|
"""
|
||||||
|
|
||||||
|
def run(self, game: Game) -> None:
|
||||||
"""Pass the game through the manager.
|
"""Pass the game through the manager.
|
||||||
May block its thread.
|
In charge of calling the final_run method."""
|
||||||
May not raise exceptions, as they will be silently ignored."""
|
self.emit("started")
|
||||||
|
self.final_run(game)
|
||||||
|
self.emit("done")
|
||||||
|
|
||||||
|
@GObject.Signal(name="started")
|
||||||
|
def started(self) -> None:
|
||||||
|
"""Signal emitted when a manager is started"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
@GObject.Signal(name="done")
|
||||||
|
def done(self) -> None:
|
||||||
|
"""Signal emitted when a manager is done"""
|
||||||
pass
|
pass
|
||||||
|
|||||||
@@ -1,17 +1,17 @@
|
|||||||
from requests import HTTPError
|
from requests import HTTPError
|
||||||
|
|
||||||
from src.game import Game
|
from src.game import Game
|
||||||
from src.store.managers.manager import Manager
|
from src.store.managers.async_manager import AsyncManager
|
||||||
from src.utils.steamgriddb import SGDBAuthError, SGDBError, SGDBHelper
|
from src.utils.steamgriddb import SGDBAuthError, SGDBError, SGDBHelper
|
||||||
from src.store.managers.steam_api_manager import SteamAPIManager
|
from src.store.managers.steam_api_manager import SteamAPIManager
|
||||||
|
|
||||||
|
|
||||||
class SGDBManager(Manager):
|
class SGDBManager(AsyncManager):
|
||||||
"""Manager in charge of downloading a game's cover from steamgriddb"""
|
"""Manager in charge of downloading a game's cover from steamgriddb"""
|
||||||
|
|
||||||
run_after = set((SteamAPIManager,))
|
run_after = set((SteamAPIManager,))
|
||||||
|
|
||||||
def run(self, game: Game) -> None:
|
def final_run(self, game: Game) -> None:
|
||||||
try:
|
try:
|
||||||
sgdb = SGDBHelper()
|
sgdb = SGDBHelper()
|
||||||
sgdb.conditionaly_update_cover(game)
|
sgdb.conditionaly_update_cover(game)
|
||||||
|
|||||||
@@ -1,14 +1,14 @@
|
|||||||
from requests import HTTPError, JSONDecodeError
|
from requests import HTTPError, JSONDecodeError
|
||||||
|
|
||||||
from src.game import Game
|
from src.game import Game
|
||||||
from src.store.managers.manager import Manager
|
from src.store.managers.async_manager import AsyncManager
|
||||||
from src.utils.steam import SteamGameNotFoundError, SteamHelper, SteamNotAGameError
|
from src.utils.steam import SteamGameNotFoundError, SteamHelper, SteamNotAGameError
|
||||||
|
|
||||||
|
|
||||||
class SteamAPIManager(Manager):
|
class SteamAPIManager(AsyncManager):
|
||||||
"""Manager in charge of completing a game's data from the Steam API"""
|
"""Manager in charge of completing a game's data from the Steam API"""
|
||||||
|
|
||||||
def run(self, game: Game) -> None:
|
def final_run(self, game: Game) -> None:
|
||||||
# Skip non-steam games
|
# Skip non-steam games
|
||||||
if not game.source.startswith("steam_"):
|
if not game.source.startswith("steam_"):
|
||||||
return
|
return
|
||||||
|
|||||||
@@ -51,34 +51,29 @@ class Pipeline(GObject.Object):
|
|||||||
|
|
||||||
def advance(self):
|
def advance(self):
|
||||||
"""Spawn tasks for managers that are able to run for a game"""
|
"""Spawn tasks for managers that are able to run for a game"""
|
||||||
for manager in self.ready:
|
|
||||||
self.waiting.remove(manager)
|
# Separate blocking / async managers
|
||||||
self.running.add(manager)
|
managers = self.ready
|
||||||
data = (manager,)
|
blocking = set(filter(lambda manager: manager.blocking, managers))
|
||||||
task = Task.new(self, manager.cancellable, self.manager_task_callback, data)
|
parallel = managers - parallel
|
||||||
task.set_task_data(data)
|
|
||||||
task.run_in_thread(self.manager_task_thread_func)
|
# Schedule parallel managers, then run the blocking ones
|
||||||
|
for manager in (*parallel, *blocking):
|
||||||
|
manager.run(self.game)
|
||||||
|
|
||||||
@GObject.Signal(name="manager-started", arg_types=(object,))
|
@GObject.Signal(name="manager-started", arg_types=(object,))
|
||||||
def manager_started(self, manager: Manager) -> None:
|
def manager_started(self, manager: Manager) -> None:
|
||||||
"""Signal emitted when a manager is started"""
|
"""Signal emitted when a manager is started"""
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def manager_task_thread_func(self, _task, _source_object, data, cancellable):
|
|
||||||
"""Thread function for manager tasks"""
|
|
||||||
manager, *_rest = data
|
|
||||||
self.emit("manager-started", manager)
|
|
||||||
manager.run(self.game)
|
|
||||||
|
|
||||||
@GObject.Signal(name="manager-done", arg_types=(object,))
|
@GObject.Signal(name="manager-done", arg_types=(object,))
|
||||||
def manager_done(self, manager: Manager) -> None:
|
def manager_done(self, manager: Manager) -> None:
|
||||||
"""Signal emitted when a manager is done"""
|
"""Signal emitted when a manager is done"""
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def manager_task_callback(self, _source_object, _result, data):
|
def on_manager_started(self, manager: Manager) -> None:
|
||||||
"""Callback function for manager tasks"""
|
self.emit("manager-started", manager)
|
||||||
manager, *_rest = data
|
|
||||||
self.running.remove(manager)
|
def on_manager_done(self, manager: Manager) -> None:
|
||||||
self.done.add(manager)
|
|
||||||
self.emit("manager-done", manager)
|
self.emit("manager-done", manager)
|
||||||
self.advance()
|
self.advance()
|
||||||
|
|||||||
Reference in New Issue
Block a user