#!@PYTHON@ # cartridges-search-provider.in # # Copyright (c) 2014-2016 Cedric Bellegarde # Copyright 2023 kramo # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # SPDX-License-Identifier: GPL-3.0-or-later # Heavily inspired by: # https://gitlab.gnome.org/World/lollypop/-/blob/master/search-provider/lollypop-sp.in import json import gi gi.require_version("Gdk", "4.0") gi.require_version("GdkPixbuf", "2.0") # pylint: disable=wrong-import-position from gi.repository import GdkPixbuf, Gio, GLib from cartridges import shared class Server: def __init__(self, con, path): method_outargs = {} method_inargs = {} for interface in Gio.DBusNodeInfo.new_for_xml(self.__doc__).interfaces: for method in interface.methods: method_outargs[method.name] = ( "(" + "".join([arg.signature for arg in method.out_args]) + ")" ) method_inargs[method.name] = tuple( arg.signature for arg in method.in_args ) con.register_object( object_path=path, interface_info=interface, method_call_closure=self.on_method_call, ) self.method_inargs = method_inargs self.method_outargs = method_outargs def on_method_call( self, _connection, _sender, _object_path, _interface_name, method_name, parameters, invocation, ): args = list(parameters.unpack()) for i, sig in enumerate(self.method_inargs[method_name]): if sig == "h": msg = invocation.get_message() fd_list = msg.get_unix_fd_list() args[i] = fd_list.get(args[i]) try: result = getattr(self, method_name)(*args) # out_args is atleast (signature1). # We therefore always wrap the result as a tuple. # Refer to https://bugzilla.gnome.org/show_bug.cgi?id=765603 result = (result,) out_args = self.method_outargs[method_name] if out_args != "()": variant = GLib.Variant(out_args, result) invocation.return_value(variant) else: invocation.return_value(None) except Exception: # pylint: disable=broad-exception-caught pass class SearchCartridgesService(Server, Gio.Application): """ """ # pylint: disable=invalid-name __SEARCH_BUS = "org.gnome.Shell.SearchProvider2" __PATH_BUS = "@PREFIX@/SearchProvider" def __init__(self): Gio.Application.__init__( self, application_id="@APP_ID@.SearchProvider", flags=Gio.ApplicationFlags.IS_SERVICE, inactivity_timeout=10000, ) self.games = {} self.load_games_from_disk() self.__bus = Gio.bus_get_sync(Gio.BusType.SESSION, None) Gio.bus_own_name_on_connection( self.__bus, self.__SEARCH_BUS, Gio.BusNameOwnerFlags.NONE, None, None ) Server.__init__(self, self.__bus, self.__PATH_BUS) def load_games_from_disk(self): if not shared.games_dir.is_dir(): return for game_file in shared.games_dir.iterdir(): try: data = json.load(game_file.open()) except (OSError, json.decoder.JSONDecodeError): continue try: # Use .get for compatibility with pre-2.0 games if any( {data.get("hidden"), data.get("blacklisted"), data.get("removed")} ): print(f"Skipped {game_file.name}") continue self.games[data["game_id"]] = (data["name"], data["developer"]) except KeyError: continue def ActivateResult(self, game_id, _array, _utime): argv = ["cartridges", "--launch", game_id] (pid, _stdin, _stdout, _stderr) = GLib.spawn_async( argv, flags=GLib.SpawnFlags.SEARCH_PATH, standard_input=False, standard_output=False, standard_error=False, ) GLib.spawn_close_pid(pid) def GetInitialResultSet(self, terms): return self.__search(terms) def GetResultMetas(self, game_ids): results = [] try: for game_id in game_ids: empty_pixbuf = GdkPixbuf.Pixbuf.new( GdkPixbuf.Colorspace.RGB, True, 8, 32, 32 ) pixbuf = None if (path := shared.covers_dir / (game_id + ".tiff")).is_file(): try: pixbuf = GdkPixbuf.Pixbuf.new_from_file_at_scale( str(path), -1, 32, True ) except GLib.Error as e: print(e) continue elif (path := shared.covers_dir / (game_id + ".gif")).is_file(): try: pixbuf = GdkPixbuf.PixbufAnimation.new_from_file( str(path) ).get_static_image() except GLib.Error as e: print(e) continue d = { "id": GLib.Variant("s", game_id), "name": GLib.Variant("s", self.games[game_id][0]), } if pixbuf: pixbuf.composite( empty_pixbuf, 6, 0, 21, 32, 6, 0, 21 / pixbuf.get_width(), 32 / pixbuf.get_height(), GdkPixbuf.InterpType.NEAREST, 255, ) d["icon-data"] = GLib.Variant( "(iiibiiay)", [ empty_pixbuf.get_width(), empty_pixbuf.get_height(), empty_pixbuf.get_rowstride(), empty_pixbuf.get_has_alpha(), empty_pixbuf.get_bits_per_sample(), empty_pixbuf.get_n_channels(), empty_pixbuf.read_pixel_bytes().get_data(), ], ) if self.games[game_id][1]: d["description"] = GLib.Variant( "s", GLib.markup_escape_text(self.games[game_id][1]) ) results.append(d) except Exception as e: # pylint: disable=broad-exception-caught print("SearchCartridgesService::GetResultMetas():", e) return [] return results def GetSubsearchResultSet(self, _previous_results, new_terms): return self.__search(new_terms) def LaunchSearch(self, terms, _utime): search = " ".join(terms) argv = ["cartridges", "--search", search] (pid, _stdin, _stdout, _stderr) = GLib.spawn_async( argv, flags=GLib.SpawnFlags.SEARCH_PATH, standard_input=False, standard_output=False, standard_error=False, ) GLib.spawn_close_pid(pid) def __search(self, terms): game_ids = [] search = " ".join(terms).lower() try: for game_id, data in self.games.items(): if search in data[0].lower(): game_ids.append(game_id) continue if data[1] and search in data[1].lower(): game_ids.append(game_id) continue except Exception as e: # pylint: disable=broad-exception-caught print("SearchCartridgesService::__search():", e) return game_ids def main(): service = SearchCartridgesService() service.run() if __name__ == "__main__": main()