Files
cartridges/search-provider/cartridges-search-provider.in
2023-12-17 00:25:01 +01:00

297 lines
9.8 KiB
Plaintext
Executable File

#!@PYTHON@
# cartridges-search-provider.in
#
# Copyright (c) 2014-2016 Cedric Bellegarde <cedric.bellegarde@adishatz.org>
# Copyright 2023 kramo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# SPDX-License-Identifier: GPL-3.0-or-later
# Heavily inspired by:
# https://gitlab.gnome.org/World/lollypop/-/blob/master/search-provider/lollypop-sp.in
import json
import gi
gi.require_version("Gdk", "4.0")
gi.require_version("GdkPixbuf", "2.0")
# pylint: disable=wrong-import-position
from gi.repository import GdkPixbuf, Gio, GLib
from cartridges import shared
class Server:
def __init__(self, con, path):
method_outargs = {}
method_inargs = {}
for interface in Gio.DBusNodeInfo.new_for_xml(self.__doc__).interfaces:
for method in interface.methods:
method_outargs[method.name] = (
"(" + "".join([arg.signature for arg in method.out_args]) + ")"
)
method_inargs[method.name] = tuple(
arg.signature for arg in method.in_args
)
con.register_object(
object_path=path,
interface_info=interface,
method_call_closure=self.on_method_call,
)
self.method_inargs = method_inargs
self.method_outargs = method_outargs
def on_method_call(
self,
_connection,
_sender,
_object_path,
_interface_name,
method_name,
parameters,
invocation,
):
args = list(parameters.unpack())
for i, sig in enumerate(self.method_inargs[method_name]):
if sig == "h":
msg = invocation.get_message()
fd_list = msg.get_unix_fd_list()
args[i] = fd_list.get(args[i])
try:
result = getattr(self, method_name)(*args)
# out_args is atleast (signature1).
# We therefore always wrap the result as a tuple.
# Refer to https://bugzilla.gnome.org/show_bug.cgi?id=765603
result = (result,)
out_args = self.method_outargs[method_name]
if out_args != "()":
variant = GLib.Variant(out_args, result)
invocation.return_value(variant)
else:
invocation.return_value(None)
except Exception: # pylint: disable=broad-exception-caught
pass
class SearchCartridgesService(Server, Gio.Application):
"""
<!DOCTYPE node PUBLIC
'-//freedesktop//DTD D-BUS Object Introspection 1.0//EN'
'http://www.freedesktop.org/standards/dbus/1.0/introspect.dtd'>
<node>
<interface name="org.gnome.Shell.SearchProvider2">
<method name="GetInitialResultSet">
<arg type="as" name="terms" direction="in" />
<arg type="as" name="results" direction="out" />
</method>
<method name="GetSubsearchResultSet">
<arg type="as" name="previous_results" direction="in" />
<arg type="as" name="terms" direction="in" />
<arg type="as" name="results" direction="out" />
</method>
<method name="GetResultMetas">
<arg type="as" name="identifiers" direction="in" />
<arg type="aa{sv}" name="metas" direction="out" />
</method>
<method name="ActivateResult">
<arg type="s" name="identifier" direction="in" />
<arg type="as" name="terms" direction="in" />
<arg type="u" name="timestamp" direction="in" />
</method>
<method name="LaunchSearch">
<arg type="as" name="terms" direction="in" />
<arg type="u" name="timestamp" direction="in" />
</method>
</interface>
</node>
"""
# pylint: disable=invalid-name
__SEARCH_BUS = "org.gnome.Shell.SearchProvider2"
__PATH_BUS = "@PREFIX@/SearchProvider"
def __init__(self):
Gio.Application.__init__(
self,
application_id="@APP_ID@.SearchProvider",
flags=Gio.ApplicationFlags.IS_SERVICE,
inactivity_timeout=10000,
)
self.games = {}
self.load_games_from_disk()
self.__bus = Gio.bus_get_sync(Gio.BusType.SESSION, None)
Gio.bus_own_name_on_connection(
self.__bus, self.__SEARCH_BUS, Gio.BusNameOwnerFlags.NONE, None, None
)
Server.__init__(self, self.__bus, self.__PATH_BUS)
def load_games_from_disk(self):
if not shared.games_dir.is_dir():
return
for game_file in shared.games_dir.iterdir():
try:
data = json.load(game_file.open())
except (OSError, json.decoder.JSONDecodeError):
continue
try:
# Use .get for compatibility with pre-2.0 games
if any(
{data.get("hidden"), data.get("blacklisted"), data.get("removed")}
):
print(f"Skipped {game_file.name}")
continue
self.games[data["game_id"]] = (data["name"], data["developer"])
except KeyError:
continue
def ActivateResult(self, game_id, _array, _utime):
argv = ["cartridges", "--launch", game_id]
(pid, _stdin, _stdout, _stderr) = GLib.spawn_async(
argv,
flags=GLib.SpawnFlags.SEARCH_PATH,
standard_input=False,
standard_output=False,
standard_error=False,
)
GLib.spawn_close_pid(pid)
def GetInitialResultSet(self, terms):
return self.__search(terms)
def GetResultMetas(self, game_ids):
results = []
try:
for game_id in game_ids:
empty_pixbuf = GdkPixbuf.Pixbuf.new(
GdkPixbuf.Colorspace.RGB, True, 8, 32, 32
)
pixbuf = None
if (path := shared.covers_dir / (game_id + ".tiff")).is_file():
try:
pixbuf = GdkPixbuf.Pixbuf.new_from_file_at_scale(
str(path), -1, 32, True
)
except GLib.Error as e:
print(e)
continue
elif (path := shared.covers_dir / (game_id + ".gif")).is_file():
try:
pixbuf = GdkPixbuf.PixbufAnimation.new_from_file(
str(path)
).get_static_image()
except GLib.Error as e:
print(e)
continue
d = {
"id": GLib.Variant("s", game_id),
"name": GLib.Variant("s", self.games[game_id][0]),
}
if pixbuf:
pixbuf.composite(
empty_pixbuf,
6,
0,
21,
32,
6,
0,
21 / pixbuf.get_width(),
32 / pixbuf.get_height(),
GdkPixbuf.InterpType.NEAREST,
255,
)
d["icon-data"] = GLib.Variant(
"(iiibiiay)",
[
empty_pixbuf.get_width(),
empty_pixbuf.get_height(),
empty_pixbuf.get_rowstride(),
empty_pixbuf.get_has_alpha(),
empty_pixbuf.get_bits_per_sample(),
empty_pixbuf.get_n_channels(),
empty_pixbuf.read_pixel_bytes().get_data(),
],
)
if self.games[game_id][1]:
d["description"] = GLib.Variant(
"s", GLib.markup_escape_text(self.games[game_id][1])
)
results.append(d)
except Exception as e: # pylint: disable=broad-exception-caught
print("SearchCartridgesService::GetResultMetas():", e)
return []
return results
def GetSubsearchResultSet(self, _previous_results, new_terms):
return self.__search(new_terms)
def LaunchSearch(self, terms, _utime):
search = " ".join(terms)
argv = ["cartridges", "--search", search]
(pid, _stdin, _stdout, _stderr) = GLib.spawn_async(
argv,
flags=GLib.SpawnFlags.SEARCH_PATH,
standard_input=False,
standard_output=False,
standard_error=False,
)
GLib.spawn_close_pid(pid)
def __search(self, terms):
game_ids = []
search = " ".join(terms).lower()
try:
for game_id, data in self.games.items():
if search in data[0].lower():
game_ids.append(game_id)
continue
if data[1] and search in data[1].lower():
game_ids.append(game_id)
continue
except Exception as e: # pylint: disable=broad-exception-caught
print("SearchCartridgesService::__search():", e)
return game_ids
def main():
service = SearchCartridgesService()
service.run()
if __name__ == "__main__":
main()