This commit is contained in:
kramo
2023-04-11 23:31:16 +02:00
parent cebf080619
commit fd66d07ac3
18 changed files with 162 additions and 135 deletions

View File

@@ -1,91 +0,0 @@
# bottles_parser.py
#
# Copyright 2022-2023 kramo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# SPDX-License-Identifier: GPL-3.0-or-later
from pathlib import Path
from time import time
import yaml
def bottles_parser(parent_widget):
schema = parent_widget.schema
bottles_dir = Path(schema.get_string("bottles-location")).expanduser()
if not (bottles_dir / "library.yml").is_file():
if (
Path("~/.var/app/com.usebottles.bottles/data/bottles/")
.expanduser()
.exists()
):
schema.set_string(
"bottles-location", "~/.var/app/com.usebottles.bottles/data/bottles/"
)
elif (parent_widget.data_dir / "bottles").exists():
schema.set_string(
"bottles-location", str(parent_widget.data_dir / "bottles")
)
else:
return
bottles_dir = Path(schema.get_string("bottles-location")).expanduser()
current_time = int(time())
data = (bottles_dir / "library.yml").read_text("utf-8")
library = yaml.load(data, Loader=yaml.Loader)
importer = parent_widget.importer
importer.total_queue += len(library)
importer.queue += len(library)
for game in library:
game = library[game]
values = {}
values["game_id"] = f'bottles_{game["id"]}'
if (
values["game_id"] in parent_widget.games
and not parent_widget.games[values["game_id"]].removed
):
importer.save_game()
continue
values["name"] = game["name"]
values["executable"] = [
"xdg-open",
f'bottles:run/{game["bottle"]["name"]}/{game["name"]}',
]
values["hidden"] = False
values["source"] = "bottles"
values["added"] = current_time
values["last_played"] = 0
importer.save_game(
values,
(
bottles_dir
/ "bottles"
/ game["bottle"]["path"]
/ "grids"
/ game["thumbnail"].split(":")[1]
)
if game["thumbnail"]
else None,
)

View File

@@ -317,9 +317,6 @@ def create_details_window(parent_widget, game_id=None):
(parent_widget.covers_dir / f"{game_id}.tiff").unlink(missing_ok=True)
(parent_widget.covers_dir / f"{game_id}.gif").unlink(missing_ok=True)
if not game_cover.get_pixbuf():
SGDBSave(parent_widget, {(game_id, values["name"])})
save_cover(
parent_widget,
game_id,
@@ -337,9 +334,13 @@ def create_details_window(parent_widget, game_id=None):
else:
save_game(parent_widget, values)
parent_widget.update_games([game_id])
if game_cover.get_pixbuf():
parent_widget.update_games([game_id])
else:
SGDBSave(parent_widget, {(game_id, values["name"])})
window.close()
parent_widget.show_overview(None, game_id)
parent_widget.show_details_view(None, game_id)
def focus_executable(_widget):
window.set_focus(executable)

View File

@@ -1,196 +0,0 @@
# heroic_parser.py
#
# Copyright 2022-2023 kramo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# SPDX-License-Identifier: GPL-3.0-or-later
import hashlib
import json
import os
from pathlib import Path
from time import time
def heroic_parser(parent_widget):
schema = parent_widget.schema
heroic_dir = Path(schema.get_string("heroic-location")).expanduser()
if not (heroic_dir / "config.json").exists():
if (
Path("~/.var/app/com.heroicgameslauncher.hgl/config/heroic/")
.expanduser()
.exists()
):
schema.set_string(
"heroic-location",
"~/.var/app/com.heroicgameslauncher.hgl/config/heroic/",
)
elif (parent_widget.config_dir / "heroic").exists():
schema.set_string(
"heroic-location", str(parent_widget.config_dir / "heroic")
)
elif os.name == "nt" and (Path(os.getenv("appdata")) / "heroic").exists():
schema.set_string(
"heroic-location", str(Path(os.getenv("appdata")) / "heroic")
)
else:
return
heroic_dir = Path(schema.get_string("heroic-location")).expanduser()
current_time = int(time())
importer = parent_widget.importer
# Import Epic games
if not schema.get_boolean("heroic-import-epic"):
pass
elif (heroic_dir / "lib-cache" / "library.json").exists():
data = (heroic_dir / "lib-cache" / "library.json").read_text("utf-8")
library = json.loads(data)
try:
for game in library["library"]:
if not game["is_installed"]:
continue
importer.total_queue += 1
importer.queue += 1
values = {}
app_name = game["app_name"]
values["game_id"] = f"heroic_epic_{app_name}"
if (
values["game_id"] in parent_widget.games
and not parent_widget.games[values["game_id"]].removed
):
importer.save_game()
continue
values["name"] = game["title"]
values["developer"] = game["developer"]
values["executable"] = (
["start", f"heroic://launch/{app_name}"]
if os.name == "nt"
else ["xdg-open", f"heroic://launch/{app_name}"]
)
values["hidden"] = False
values["source"] = "heroic_epic"
values["added"] = current_time
values["last_played"] = 0
image_path = (
heroic_dir
/ "images-cache"
/ hashlib.sha256(
(f'{game["art_square"]}?h=400&resize=1&w=300').encode()
).hexdigest()
)
importer.save_game(values, image_path if image_path.exists() else None)
except KeyError:
pass
# Import GOG games
if not schema.get_boolean("heroic-import-gog"):
pass
elif (heroic_dir / "gog_store" / "installed.json").exists():
data = (heroic_dir / "gog_store" / "installed.json").read_text("utf-8")
installed = json.loads(data)
importer.total_queue += len(installed["installed"])
importer.queue += len(installed["installed"])
for item in installed["installed"]:
values = {}
app_name = item["appName"]
values["game_id"] = f"heroic_gog_{app_name}"
if (
values["game_id"] in parent_widget.games
and not parent_widget.games[values["game_id"]].removed
):
importer.save_game()
continue
# Get game title and developer from library.json as they are not present in installed.json
data = (heroic_dir / "gog_store" / "library.json").read_text("utf-8")
library = json.loads(data)
for game in library["games"]:
if game["app_name"] == app_name:
values["developer"] = game["developer"]
values["name"] = game["title"]
image_path = (
heroic_dir
/ "images-cache"
/ hashlib.sha256(game["art_square"].encode()).hexdigest()
)
values["executable"] = (
["start", f"heroic://launch/{app_name}"]
if os.name == "nt"
else ["xdg-open", f"heroic://launch/{app_name}"]
)
values["hidden"] = False
values["source"] = "heroic_gog"
values["added"] = current_time
values["last_played"] = 0
importer.save_game(values, image_path if image_path.exists() else None)
# Import sideloaded games
if not schema.get_boolean("heroic-import-sideload"):
pass
elif (heroic_dir / "sideload_apps" / "library.json").exists():
data = (heroic_dir / "sideload_apps" / "library.json").read_text("utf-8")
library = json.loads(data)
importer.total_queue += len(library["games"])
importer.queue += len(library["games"])
for item in library["games"]:
values = {}
app_name = item["app_name"]
values["game_id"] = f"heroic_sideload_{app_name}"
if (
values["game_id"] in parent_widget.games
and not parent_widget.games[values["game_id"]].removed
):
importer.save_game()
continue
values["name"] = item["title"]
values["executable"] = (
["start", f"heroic://launch/{app_name}"]
if os.name == "nt"
else ["xdg-open", f"heroic://launch/{app_name}"]
)
values["hidden"] = False
values["source"] = "heroic_sideload"
values["added"] = current_time
values["last_played"] = 0
image_path = (
heroic_dir
/ "images-cache"
/ hashlib.sha256(item["art_square"].encode()).hexdigest()
)
importer.save_game(values, image_path if image_path.exists() else None)

View File

@@ -1,170 +0,0 @@
# itch_parser.py
#
# Copyright 2022-2023 kramo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# SPDX-License-Identifier: GPL-3.0-or-later
import os
from pathlib import Path
from shutil import copyfile
from sqlite3 import connect
from time import time
import requests
from gi.repository import GdkPixbuf, Gio
def get_game(task, current_time, parent_widget, row):
values = {}
values["game_id"] = f"itch_{row[0]}"
if (
values["game_id"] in parent_widget.games
and not parent_widget.games[values["game_id"]].removed
):
task.return_value((None, None))
return
values["added"] = current_time
values["executable"] = (
["start", f"itch://caves/{row[4]}/launch"]
if os.name == "nt"
else ["xdg-open", f"itch://caves/{row[4]}/launch"]
)
values["hidden"] = False
values["last_played"] = 0
values["name"] = row[1]
values["source"] = "itch"
if row[3] or row[2]:
tmp_file = Gio.File.new_tmp(None)[0]
try:
with requests.get(row[3] or row[2], timeout=5) as cover:
cover.raise_for_status()
Path(tmp_file.get_path()).write_bytes(cover.content)
except requests.exceptions.RequestException:
task.return_value((values, None))
return
game_cover = GdkPixbuf.Pixbuf.new_from_stream_at_scale(
tmp_file.read(), 2, 2, False
).scale_simple(*parent_widget.image_size, GdkPixbuf.InterpType.BILINEAR)
itch_pixbuf = GdkPixbuf.Pixbuf.new_from_stream(tmp_file.read())
itch_pixbuf = itch_pixbuf.scale_simple(
parent_widget.image_size[0],
itch_pixbuf.get_height()
* (parent_widget.image_size[0] / itch_pixbuf.get_width()),
GdkPixbuf.InterpType.BILINEAR,
)
itch_pixbuf.composite(
game_cover,
0,
(parent_widget.image_size[1] - itch_pixbuf.get_height()) / 2,
itch_pixbuf.get_width(),
itch_pixbuf.get_height(),
0,
(parent_widget.image_size[1] - itch_pixbuf.get_height()) / 2,
1.0,
1.0,
GdkPixbuf.InterpType.BILINEAR,
255,
)
else:
game_cover = None
task.return_value((values, game_cover))
def get_games_async(parent_widget, rows, importer):
current_time = int(time())
# Wrap the function in another one as Gio.Task.run_in_thread does not allow for passing args
def create_func(current_time, parent_widget, row):
def wrapper(task, *_unused):
get_game(
task,
current_time,
parent_widget,
row,
)
return wrapper
def update_games(_task, result):
final_values = result.propagate_value()[1]
# No need for an if statement as final_value would be None for games we don't want to save
importer.save_game(final_values[0], pixbuf=final_values[1])
for row in rows:
task = Gio.Task.new(None, None, update_games)
task.run_in_thread(create_func(current_time, parent_widget, row))
def itch_parser(parent_widget):
schema = parent_widget.schema
database_path = (Path(schema.get_string("itch-location")) / "db").expanduser()
if not database_path.exists():
if Path("~/.var/app/io.itch.itch/config/itch/").expanduser().exists():
schema.set_string("itch-location", "~/.var/app/io.itch.itch/config/itch/")
elif (parent_widget.config_dir / "itch").exists():
schema.set_string("itch-location", str(parent_widget.config_dir / "itch"))
elif os.name == "nt" and (Path(os.getenv("appdata")) / "itch").exists():
schema.set_string("itch-location", str(Path(os.getenv("appdata")) / "itch"))
else:
return
database_path = (Path(schema.get_string("itch-location")) / "db").expanduser()
db_cache_dir = parent_widget.cache_dir / "cartridges" / "itch"
db_cache_dir.mkdir(parents=True, exist_ok=True)
# Copy the file because sqlite3 doesn't like databases in /run/user/
database_tmp_path = db_cache_dir / "butler.db"
for db_file in database_path.glob("butler.db*"):
copyfile(db_file, (db_cache_dir / db_file.name))
db_request = """
SELECT
games.id,
games.title,
games.cover_url,
games.still_cover_url,
caves.id
FROM
'caves'
INNER JOIN
'games'
ON
caves.game_id = games.id
;
"""
connection = connect(database_tmp_path)
cursor = connection.execute(db_request)
rows = cursor.fetchall()
connection.close()
# No need to unlink temp files as they disappear when the connection is closed
database_tmp_path.unlink(missing_ok=True)
importer = parent_widget.importer
importer.total_queue += len(rows)
importer.queue += len(rows)
get_games_async(parent_widget, rows, importer)

View File

@@ -1,115 +0,0 @@
# lutris_parser.py
#
# Copyright 2022-2023 kramo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# SPDX-License-Identifier: GPL-3.0-or-later
from pathlib import Path
from shutil import copyfile
from sqlite3 import connect
from time import time
def lutris_parser(parent_widget):
schema = parent_widget.schema
database_path = (Path(schema.get_string("lutris-location"))).expanduser()
if not database_path.exists():
if Path("~/.var/app/net.lutris.Lutris/data/lutris/").expanduser().exists():
schema.set_string(
"lutris-location", "~/.var/app/net.lutris.Lutris/data/lutris/"
)
elif (parent_widget.data_dir / "lutris").exists():
schema.set_string("lutris-location", str(parent_widget.data_dir / "lutris"))
else:
return
cache_dir = Path(schema.get_string("lutris-cache-location")).expanduser()
if not cache_dir.exists():
if Path("~/.var/app/net.lutris.Lutris/cache/lutris/").expanduser().exists():
schema.set_string(
"lutris-cache-location", "~/.var/app/net.lutris.Lutris/cache/lutris/"
)
elif (parent_widget.cache_dir / "lutris").exists():
schema.set_string(
"lutris-cache-location", str(parent_widget.cache_dir / "lutris")
)
else:
return
database_path = (Path(schema.get_string("lutris-location"))).expanduser()
cache_dir = Path(schema.get_string("lutris-cache-location")).expanduser()
db_cache_dir = parent_widget.cache_dir / "cartridges" / "lutris"
db_cache_dir.mkdir(parents=True, exist_ok=True)
# Copy the file because sqlite3 doesn't like databases in /run/user/
database_tmp_path = db_cache_dir / "pga.db"
for db_file in database_path.glob("pga.db*"):
copyfile(db_file, (db_cache_dir / db_file.name))
db_request = """
SELECT
id, name, slug, runner, hidden
FROM
'games'
WHERE
name IS NOT NULL
AND slug IS NOT NULL
AND configPath IS NOT NULL
AND installed IS TRUE
;
"""
connection = connect(database_tmp_path)
cursor = connection.execute(db_request)
rows = cursor.fetchall()
connection.close()
# No need to unlink temp files as they disappear when the connection is closed
database_tmp_path.unlink(missing_ok=True)
if not schema.get_boolean("lutris-import-steam"):
rows = [row for row in rows if not row[3] == "steam"]
current_time = int(time())
importer = parent_widget.importer
importer.total_queue += len(rows)
importer.queue += len(rows)
for row in rows:
values = {}
values["game_id"] = f"lutris_{row[3]}_{row[0]}"
if (
values["game_id"] in parent_widget.games
and not parent_widget.games[values["game_id"]].removed
):
importer.save_game()
continue
values["added"] = current_time
values["executable"] = ["xdg-open", f"lutris:rungameid/{row[0]}"]
values["hidden"] = row[4] == 1
values["last_played"] = 0
values["name"] = row[1]
values["source"] = f"lutris_{row[3]}"
image_path = cache_dir / "coverart" / f"{row[2]}.jpg"
importer.save_game(values, image_path if image_path.exists() else None)

View File

@@ -1,175 +0,0 @@
# steam_parser.py
#
# Copyright 2022-2023 kramo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# SPDX-License-Identifier: GPL-3.0-or-later
import os
import re
from pathlib import Path
from time import time
import requests
from gi.repository import Gio
def update_values_from_data(content, values):
basic_data = content[values["appid"]]
if not basic_data["success"]:
values["blacklisted"] = True
else:
data = basic_data["data"]
values["developer"] = ", ".join(data["developers"])
if data["type"] != "game":
values["blacklisted"] = True
return values
def get_game(task, datatypes, current_time, parent_widget, appmanifest, steam_dir):
values = {}
data = appmanifest.read_text("utf-8")
for datatype in datatypes:
value = re.findall(f'"{datatype}"\t\t"(.*)"\n', data, re.IGNORECASE)
try:
values[datatype] = value[0]
except IndexError:
task.return_value((None, None))
return
values["game_id"] = f'steam_{values["appid"]}'
if (
values["game_id"] in parent_widget.games
and not parent_widget.games[values["game_id"]].removed
):
task.return_value((None, None))
return
values["executable"] = (
["start", f'steam://rungameid/{values["appid"]}']
if os.name == "nt"
else ["xdg-open", f'steam://rungameid/{values["appid"]}']
)
values["hidden"] = False
values["source"] = "steam"
values["added"] = current_time
values["last_played"] = 0
image_path = (
steam_dir
/ "appcache"
/ "librarycache"
/ f'{values["appid"]}_library_600x900.jpg'
)
try:
with requests.get(
f'https://store.steampowered.com/api/appdetails?appids={values["appid"]}',
timeout=5,
) as open_file:
open_file.raise_for_status()
content = open_file.json()
except requests.exceptions.RequestException:
task.return_value((values, image_path if image_path.exists() else None))
return
values = update_values_from_data(content, values)
task.return_value((values, image_path if image_path.exists() else None))
def get_games_async(parent_widget, appmanifests, steam_dir, importer):
datatypes = ["appid", "name"]
current_time = int(time())
# Wrap the function in another one as Gio.Task.run_in_thread does not allow for passing args
def create_func(datatypes, current_time, parent_widget, appmanifest, steam_dir):
def wrapper(task, *_unused):
get_game(
task,
datatypes,
current_time,
parent_widget,
appmanifest,
steam_dir,
)
return wrapper
def update_games(_task, result):
final_values = result.propagate_value()[1]
# No need for an if statement as final_value would be None for games we don't want to save
importer.save_game(final_values[0], final_values[1])
for appmanifest in appmanifests:
task = Gio.Task.new(None, None, update_games)
task.run_in_thread(
create_func(datatypes, current_time, parent_widget, appmanifest, steam_dir)
)
def steam_parser(parent_widget):
schema = parent_widget.schema
steam_dir = Path(schema.get_string("steam-location")).expanduser()
def steam_not_found():
if Path("~/.var/app/com.valvesoftware.Steam/data/Steam/").expanduser().exists():
schema.set_string(
"steam-location", "~/.var/app/com.valvesoftware.Steam/data/Steam/"
)
elif Path("~/.steam/steam/").expanduser().exists():
schema.set_string("steam-location", "~/.steam/steam/")
elif (
os.name == "nt"
and (Path(os.getenv("programfiles(x86)")) / "Steam").exists()
):
schema.set_string(
"steam-location", str(Path(os.getenv("programfiles(x86)")) / "Steam")
)
if (steam_dir / "steamapps").exists():
pass
elif (steam_dir / "steam" / "steamapps").exists():
schema.set_string("steam-location", str(steam_dir / "steam"))
elif (steam_dir / "Steam" / "steamapps").exists():
schema.set_string("steam-location", str(steam_dir / "Steam"))
else:
steam_not_found()
steam_parser(parent_widget)
return
steam_dir = Path(schema.get_string("steam-location")).expanduser()
appmanifests = []
steam_dirs = [Path(directory) for directory in schema.get_strv("steam-extra-dirs")]
steam_dirs.append(steam_dir)
for directory in steam_dirs:
if not (directory / "steamapps").exists():
steam_dirs.remove(directory)
for directory in steam_dirs:
for open_file in (directory / "steamapps").iterdir():
if open_file.is_file() and "appmanifest" in open_file.name:
appmanifests.append(open_file)
importer = parent_widget.importer
importer.total_queue += len(appmanifests)
importer.queue += len(appmanifests)
get_games_async(parent_widget, appmanifests, directory, importer)

View File

@@ -35,7 +35,7 @@ class SGDBSave:
)
):
if not self.importer:
self.parent_widget.loading = game[0]
self.parent_widget.games[game[0]].set_loading(1)
url = "https://www.steamgriddb.com/api/v2/"
headers = {
@@ -106,8 +106,6 @@ class SGDBSave:
self.importer.queue -= 1
self.importer.done()
self.importer.sgdb_exception = self.exception
else:
self.parent_widget.loading = None
if self.exception:
create_dialog(