# importer.py # # Copyright 2022-2023 kramo # Copyright 2023 Geoffrey Coulaud # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # SPDX-License-Identifier: GPL-3.0-or-later import logging from typing import Any, Optional from gi.repository import Adw, GLib, Gtk from src import shared from src.errors.error_producer import ErrorProducer from src.errors.friendly_error import FriendlyError from src.game import Game from src.importer.sources.location import UnresolvableLocationError from src.importer.sources.source import Source from src.store.managers.async_manager import AsyncManager from src.store.pipeline import Pipeline from src.utils.task import Task # pylint: disable=too-many-instance-attributes class Importer(ErrorProducer): """A class in charge of scanning sources for games""" progressbar: Gtk.ProgressBar import_statuspage: Adw.StatusPage import_dialog: Adw.MessageDialog summary_toast: Adw.Toast sources: set[Source] n_source_tasks_created: int = 0 n_source_tasks_done: int = 0 n_pipelines_done: int = 0 game_pipelines: set[Pipeline] removed_game_ids: set[str] = set() imported_game_ids: set[str] = set() def __init__(self) -> None: super().__init__() # TODO: make this stateful shared.store.new_game_ids = set() shared.store.duplicate_game_ids = set() self.game_pipelines = set() self.sources = set() @property def n_games_added(self) -> int: return sum( 1 if not (pipeline.game.blacklisted or pipeline.game.removed) else 0 for pipeline in self.game_pipelines ) @property def pipelines_progress(self) -> float: progress = sum(pipeline.progress for pipeline in self.game_pipelines) try: progress = progress / len(self.game_pipelines) except ZeroDivisionError: progress = 0 return progress # type: ignore @property def sources_progress(self) -> float: try: progress = self.n_source_tasks_done / self.n_source_tasks_created except ZeroDivisionError: progress = 0 return progress @property def finished(self) -> bool: return ( self.n_source_tasks_created == self.n_source_tasks_done and len(self.game_pipelines) == self.n_pipelines_done ) def add_source(self, source: Source) -> None: self.sources.add(source) def run(self) -> None: """Use several Gio.Task to import games from added sources""" shared.win.get_application().lookup_action("import").set_enabled(False) self.create_dialog() # Collect all errors and reset the cancellables for the managers # - Only one importer exists at any given time # - Every import starts fresh self.collect_errors() for manager in shared.store.managers.values(): manager.collect_errors() if isinstance(manager, AsyncManager): manager.reset_cancellable() for source in self.sources: logging.debug("Importing games from source %s", source.source_id) task = Task.new(None, None, self.source_callback, (source,)) self.n_source_tasks_created += 1 task.set_task_data((source,)) task.run_in_thread(self.source_task_thread_func) self.progress_changed_callback() def create_dialog(self) -> None: """Create the import dialog""" self.progressbar = Gtk.ProgressBar(margin_start=12, margin_end=12) self.import_statuspage = Adw.StatusPage( title=_("Importing Games…"), child=self.progressbar, ) self.import_dialog = Adw.Window( content=self.import_statuspage, modal=True, default_width=350, default_height=-1, transient_for=shared.win, deletable=False, ) self.import_dialog.present() def source_task_thread_func( self, _task: Any, _obj: Any, data: tuple, _cancellable: Any ) -> None: """Source import task code""" source: Source source, *_rest = data # Early exit if not available or not installed if not source.is_available: logging.info("Source %s skipped, not available", source.source_id) return try: iterator = iter(source) except UnresolvableLocationError: logging.info("Source %s skipped, bad location", source.source_id) return # Get games from source logging.info("Scanning source %s", source.source_id) while True: # Handle exceptions raised when iterating try: iteration_result = next(iterator) except StopIteration: break except Exception as error: # pylint: disable=broad-exception-caught logging.exception("%s in %s", type(error).__name__, source.source_id) self.report_error(error) continue # Handle the result depending on its type if isinstance(iteration_result, Game): game = iteration_result additional_data = {} elif isinstance(iteration_result, tuple): game, additional_data = iteration_result elif iteration_result is None: continue else: # Warn source implementers that an invalid type was produced # Should not happen on production code logging.warning( "%s produced an invalid iteration return type %s", source.source_id, type(iteration_result), ) continue # Register game pipeline: Pipeline = shared.store.add_game(game, additional_data) if pipeline is not None: logging.info("Imported %s (%s)", game.name, game.game_id) pipeline.connect("advanced", self.pipeline_advanced_callback) self.game_pipelines.add(pipeline) def update_progressbar(self) -> None: """Update the progressbar to show the overall import progress""" # Reserve 10% for the sources discovery, the rest is the pipelines self.progressbar.set_fraction( (0.1 * self.sources_progress) + (0.9 * self.pipelines_progress) ) def source_callback(self, _obj: Any, _result: Any, data: tuple) -> None: """Callback executed when a source is fully scanned""" source, *_rest = data logging.debug("Import done for source %s", source.source_id) self.n_source_tasks_done += 1 self.progress_changed_callback() def pipeline_advanced_callback(self, pipeline: Pipeline) -> None: """Callback called when a pipeline for a game has advanced""" if pipeline.is_done: self.n_pipelines_done += 1 self.progress_changed_callback() def progress_changed_callback(self) -> None: """ Callback called when the import process has progressed Triggered when: * All sources have been started * A source finishes * A pipeline finishes """ self.update_progressbar() if self.finished: self.import_callback() def remove_games(self) -> None: """Set removed to True for missing games""" if not shared.schema.get_boolean("remove-missing"): return for game in shared.store: if game.removed: continue if game.source == "imported": continue if not shared.schema.get_boolean(game.base_source): continue if game.game_id in shared.store.duplicate_game_ids: continue if game.game_id in shared.store.new_game_ids: continue logging.debug("Removing missing game %s (%s)", game.name, game.game_id) game.removed = True game.save() game.update() self.removed_game_ids.add(game.game_id) def import_callback(self) -> None: """Callback called when importing has finished""" logging.info("Import done") self.remove_games() self.imported_game_ids = shared.store.new_game_ids shared.store.new_game_ids = set() shared.store.duplicate_game_ids = set() self.import_dialog.close() self.summary_toast = self.create_summary_toast() self.create_error_dialog() shared.win.get_application().lookup_action("import").set_enabled(True) def create_error_dialog(self) -> None: """Dialog containing all errors raised by importers""" # Collect all errors that happened in the importer and the managers errors = [] errors.extend(self.collect_errors()) for manager in shared.store.managers.values(): errors.extend(manager.collect_errors()) # Filter out non friendly errors errors = set( tuple( (error.title, error.subtitle) for error in ( filter(lambda error: isinstance(error, FriendlyError), errors) ) ) ) # No error to display if not errors: self.timeout_toast() return # Create error dialog dialog = Adw.MessageDialog() dialog.set_heading(_("Warning")) dialog.add_response("close", _("Dismiss")) dialog.add_response("open_preferences_import", _("Preferences")) dialog.set_default_response("open_preferences_import") dialog.connect("response", self.dialog_response_callback) dialog.set_transient_for(shared.win) if len(errors) == 1: dialog.set_heading((error := next(iter(errors)))[0]) dialog.set_body(error[1]) else: # Display the errors in a list list_box = Gtk.ListBox() list_box.set_selection_mode(Gtk.SelectionMode.NONE) list_box.set_css_classes(["boxed-list"]) list_box.set_margin_top(9) for error in errors: row = Adw.ActionRow.new() row.set_title(error[0]) row.set_subtitle(error[1]) list_box.append(row) dialog.set_body(_("The following errors occured during import:")) dialog.set_extra_child(list_box) dialog.present() def undo_import(self, *_args: Any) -> None: for game_id in self.imported_game_ids: shared.store[game_id].removed = True shared.store[game_id].update() shared.store[game_id].save() for game_id in self.removed_game_ids: shared.store[game_id].removed = False shared.store[game_id].update() shared.store[game_id].save() self.imported_game_ids = set() self.removed_game_ids = set() self.summary_toast.dismiss() logging.info("Import undone") def create_summary_toast(self) -> Adw.Toast: """N games imported, removed toast""" toast = Adw.Toast(timeout=0, priority=Adw.ToastPriority.HIGH) if not self.n_games_added: toast_title = _("No new games found") if not self.removed_game_ids: toast.set_button_label(_("Preferences")) toast.connect( "button-clicked", self.dialog_response_callback, "open_preferences", "import", ) elif self.n_games_added == 1: toast_title = _("1 game imported") elif self.n_games_added > 1: # The variable is the number of games toast_title = _("{} games imported").format(self.n_games_added) if (removed_length := len(self.removed_game_ids)) == 1: # A single game removed toast_title += ", " + _("1 removed") elif removed_length > 1: # The variable is the number of games removed toast_title += ", " + _("{} removed").format(removed_length) if self.n_games_added or self.removed_game_ids: toast.set_button_label(_("Undo")) toast.connect("button-clicked", self.undo_import) toast.set_title(toast_title) shared.win.toast_overlay.add_toast(toast) return toast def open_preferences( self, page_name: Optional[str] = None, expander_row: Optional[Adw.ExpanderRow] = None, ) -> Adw.PreferencesWindow: return shared.win.get_application().on_preferences_action( page_name=page_name, expander_row=expander_row ) def timeout_toast(self, *_args: Any) -> None: """Manually timeout the toast after the user has dismissed all warnings""" GLib.timeout_add_seconds(5, self.summary_toast.dismiss) def dialog_response_callback(self, _widget: Any, response: str, *args: Any) -> None: """Handle after-import dialogs callback""" logging.debug("After-import dialog response: %s (%s)", response, str(args)) if response == "open_preferences": self.open_preferences(*args) elif response == "open_preferences_import": self.open_preferences(*args).connect("close-request", self.timeout_toast) else: self.timeout_toast()