🔥 Remove old importers + rate limiter debug logs

This commit is contained in:
GeoffreyCoulaud
2023-06-20 13:07:49 +02:00
parent 6408e250ee
commit c38a73ea98
7 changed files with 0 additions and 809 deletions

View File

@@ -1,106 +0,0 @@
# bottles_importer.py
#
# Copyright 2022-2023 kramo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# SPDX-License-Identifier: GPL-3.0-or-later
from pathlib import Path
from time import time
import yaml
from src import shared
from src.utils.check_install import check_install
def bottles_installed(path=None):
location_key = "bottles-location"
check = "library.yml"
locations = (
(path,)
if path
else (
Path(shared.schema.get_string(location_key)).expanduser(),
Path.home() / ".var/app/com.usebottles.bottles/data/bottles",
shared.data_dir / "bottles",
)
)
bottles_dir = check_install(check, locations, (shared.schema, location_key))
return bottles_dir
def bottles_importer():
bottles_dir = bottles_installed()
if not bottles_dir:
return
current_time = int(time())
data = (bottles_dir / "library.yml").read_text("utf-8")
library = yaml.safe_load(data)
importer = shared.importer
importer.total_queue += len(library)
importer.queue += len(library)
for game in library:
game = library[game]
values = {}
values["game_id"] = f'bottles_{game["id"]}'
if (
values["game_id"] in shared.win.games
and not shared.win.games[values["game_id"]].removed
):
importer.save_game()
continue
values["name"] = game["name"]
values["executable"] = [
"xdg-open",
f'bottles:run/{game["bottle"]["name"]}/{game["name"]}',
]
values["hidden"] = False
values["source"] = "bottles"
values["added"] = current_time
# This will not work if both Cartridges and Bottles are installed via Flatpak
# as Cartridges can't access directories picked via Bottles' file picker portal
try:
bottles_location = Path(
yaml.safe_load((bottles_dir / "data.yml").read_text("utf-8"))[
"custom_bottles_path"
]
)
except (FileNotFoundError, KeyError):
bottles_location = bottles_dir / "bottles"
grid_path = (
bottles_location
/ game["bottle"]["path"]
/ "grids"
/ game["thumbnail"].split(":")[1]
)
importer.save_game(
values,
grid_path if game["thumbnail"] and grid_path.is_file() else None,
)

View File

@@ -1,198 +0,0 @@
# heroic_importer.py
#
# Copyright 2022-2023 kramo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# SPDX-License-Identifier: GPL-3.0-or-later
import json
import os
from hashlib import sha256
from pathlib import Path
from time import time
from src import shared
from src.utils.check_install import check_install
def heroic_installed(path=None):
location_key = "heroic-location"
check = "config.json"
locations = (
(path,)
if path
else (
Path(shared.schema.get_string(location_key)).expanduser(),
Path.home() / ".var/app/com.heroicgameslauncher.hgl/config/heroic",
shared.config_dir / "heroic",
)
)
if os.name == "nt" and not path:
locations += (Path(os.getenv("appdata")) / "heroic",)
heroic_dir = check_install(check, locations, (shared.schema, location_key))
return heroic_dir
def heroic_importer():
heroic_dir = heroic_installed()
if not heroic_dir:
return
current_time = int(time())
importer = shared.importer
# Import Epic games
if not shared.schema.get_boolean("heroic-import-epic"):
pass
elif (heroic_dir / "store_cache" / "legendary_library.json").is_file():
library = json.load(
(heroic_dir / "store_cache" / "legendary_library.json").open()
)
try:
for game in library["library"]:
if not game["is_installed"]:
continue
importer.total_queue += 1
importer.queue += 1
values = {}
app_name = game["app_name"]
values["game_id"] = f"heroic_epic_{app_name}"
if (
values["game_id"] in shared.win.games
and not shared.win.games[values["game_id"]].removed
):
importer.save_game()
continue
values["name"] = game["title"]
values["developer"] = game["developer"]
values["executable"] = (
["start", f"heroic://launch/{app_name}"]
if os.name == "nt"
else ["xdg-open", f"heroic://launch/{app_name}"]
)
values["hidden"] = False
values["source"] = "heroic_epic"
values["added"] = current_time
image_path = (
heroic_dir
/ "images-cache"
/ sha256(
(f'{game["art_square"]}?h=400&resize=1&w=300').encode()
).hexdigest()
)
importer.save_game(values, image_path if image_path.is_file() else None)
except KeyError:
pass
# Import GOG games
if not shared.schema.get_boolean("heroic-import-gog"):
pass
elif (heroic_dir / "gog_store" / "installed.json").is_file() and (
heroic_dir / "store_cache" / "gog_library.json"
).is_file():
installed = json.load((heroic_dir / "gog_store" / "installed.json").open())
importer.total_queue += len(installed["installed"])
importer.queue += len(installed["installed"])
for item in installed["installed"]:
values = {}
app_name = item["appName"]
values["game_id"] = f"heroic_gog_{app_name}"
if (
values["game_id"] in shared.win.games
and not shared.win.games[values["game_id"]].removed
):
importer.save_game()
continue
# Get game title and developer from gog_library.json as they are not present in installed.json
library = json.load(
(heroic_dir / "store_cache" / "gog_library.json").open()
)
for game in library["games"]:
if game["app_name"] == app_name:
values["developer"] = game["developer"]
values["name"] = game["title"]
image_path = (
heroic_dir
/ "images-cache"
/ sha256(game["art_square"].encode()).hexdigest()
)
values["executable"] = (
["start", f"heroic://launch/{app_name}"]
if os.name == "nt"
else ["xdg-open", f"heroic://launch/{app_name}"]
)
values["hidden"] = False
values["source"] = "heroic_gog"
values["added"] = current_time
importer.save_game(values, image_path if image_path.is_file() else None)
# Import sideloaded games
if not shared.schema.get_boolean("heroic-import-sideload"):
pass
elif (heroic_dir / "sideload_apps" / "library.json").is_file():
library = json.load((heroic_dir / "sideload_apps" / "library.json").open())
importer.total_queue += len(library["games"])
importer.queue += len(library["games"])
for item in library["games"]:
values = {}
app_name = item["app_name"]
values["game_id"] = f"heroic_sideload_{app_name}"
if (
values["game_id"] in shared.win.games
and not shared.win.games[values["game_id"]].removed
):
importer.save_game()
continue
values["name"] = item["title"]
values["executable"] = (
["start", f"heroic://launch/{app_name}"]
if os.name == "nt"
else ["xdg-open", f"heroic://launch/{app_name}"]
)
values["hidden"] = False
values["source"] = "heroic_sideload"
values["added"] = current_time
image_path = (
heroic_dir
/ "images-cache"
/ sha256(item["art_square"].encode()).hexdigest()
)
importer.save_game(values, image_path if image_path.is_file() else None)

View File

@@ -1,188 +0,0 @@
# itch_importer.py
#
# Copyright 2022-2023 kramo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# SPDX-License-Identifier: GPL-3.0-or-later
import os
from pathlib import Path
from shutil import copyfile
from sqlite3 import connect
from time import time
import requests
from gi.repository import GdkPixbuf, Gio
from src import shared
from src.utils.check_install import check_install
from src.utils.save_cover import resize_cover
def get_game(task, current_time, row):
values = {}
values["game_id"] = f"itch_{row[0]}"
if (
values["game_id"] in shared.win.games
and not shared.win.games[values["game_id"]].removed
):
task.return_value((None, None))
return
values["added"] = current_time
values["executable"] = (
["start", f"itch://caves/{row[4]}/launch"]
if os.name == "nt"
else ["xdg-open", f"itch://caves/{row[4]}/launch"]
)
values["hidden"] = False
values["name"] = row[1]
values["source"] = "itch"
if row[3] or row[2]:
tmp_file = Gio.File.new_tmp()[0]
try:
with requests.get(row[3] or row[2], timeout=5) as cover:
cover.raise_for_status()
Path(tmp_file.get_path()).write_bytes(cover.content)
except requests.exceptions.RequestException:
task.return_value((values, None))
return
game_cover = GdkPixbuf.Pixbuf.new_from_stream_at_scale(
tmp_file.read(), 2, 2, False
).scale_simple(*shared.image_size, GdkPixbuf.InterpType.BILINEAR)
itch_pixbuf = GdkPixbuf.Pixbuf.new_from_stream(tmp_file.read())
itch_pixbuf = itch_pixbuf.scale_simple(
shared.image_size[0],
itch_pixbuf.get_height() * (shared.image_size[0] / itch_pixbuf.get_width()),
GdkPixbuf.InterpType.BILINEAR,
)
itch_pixbuf.composite(
game_cover,
0,
(shared.image_size[1] - itch_pixbuf.get_height()) / 2,
itch_pixbuf.get_width(),
itch_pixbuf.get_height(),
0,
(shared.image_size[1] - itch_pixbuf.get_height()) / 2,
1.0,
1.0,
GdkPixbuf.InterpType.BILINEAR,
255,
)
else:
game_cover = None
task.return_value((values, game_cover))
def get_games_async(rows, importer):
current_time = int(time())
# Wrap the function in another one as Gio.Task.run_in_thread does not allow for passing args
def create_func(current_time, row):
def wrapper(task, *_args):
get_game(
task,
current_time,
row,
)
return wrapper
def update_games(_task, result):
final_values = result.propagate_value()[1]
# No need for an if statement as final_value would be None for games we don't want to save
importer.save_game(
final_values[0],
resize_cover(pixbuf=final_values[1]),
)
for row in rows:
task = Gio.Task.new(None, None, update_games)
task.run_in_thread(create_func(current_time, row))
def itch_installed(path=None):
location_key = "itch-location"
check = Path("db") / "butler.db"
locations = (
(path,)
if path
else (
Path(shared.schema.get_string(location_key)).expanduser(),
Path.home() / ".var/app/io.itch.itch/config/itch",
shared.config_dir / "itch",
)
)
if os.name == "nt" and not path:
locations += (Path(os.getenv("appdata")) / "itch",)
itch_dir = check_install(check, locations, (shared.schema, location_key))
return itch_dir
def itch_importer():
itch_dir = itch_installed()
if not itch_dir:
return
database_path = (itch_dir / "db").expanduser()
db_cache_dir = shared.cache_dir / "cartridges" / "itch"
db_cache_dir.mkdir(parents=True, exist_ok=True)
# Copy the file because sqlite3 doesn't like databases in /run/user/
database_tmp_path = db_cache_dir / "butler.db"
for db_file in database_path.glob("butler.db*"):
copyfile(db_file, (db_cache_dir / db_file.name))
db_request = """
SELECT
games.id,
games.title,
games.cover_url,
games.still_cover_url,
caves.id
FROM
'caves'
INNER JOIN
'games'
ON
caves.game_id = games.id
;
"""
connection = connect(database_tmp_path)
cursor = connection.execute(db_request)
rows = cursor.fetchall()
connection.close()
# No need to unlink temp files as they disappear when the connection is closed
database_tmp_path.unlink(missing_ok=True)
importer = shared.importer
importer.total_queue += len(rows)
importer.queue += len(rows)
get_games_async(rows, importer)

View File

@@ -1,133 +0,0 @@
# lutris_importer.py
#
# Copyright 2022-2023 kramo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# SPDX-License-Identifier: GPL-3.0-or-later
from pathlib import Path
from shutil import copyfile
from sqlite3 import connect
from time import time
from src import shared
from src.utils.check_install import check_install
def lutris_installed(path=None):
location_key = "lutris-location"
check = "pga.db"
locations = (
(path,)
if path
else (
Path(shared.schema.get_string(location_key)).expanduser(),
Path.home() / ".var/app/net.lutris.Lutris/data/lutris",
shared.data_dir / "lutris",
)
)
lutris_dir = check_install(check, locations, (shared.schema, location_key))
return lutris_dir
def lutris_cache_exists(path=None):
cache_key = "lutris-cache-location"
cache_check = "coverart"
cache_locations = (
(path,)
if path
else (
Path(shared.schema.get_string(cache_key)).expanduser(),
Path.home() / ".var" / "app" / "net.lutris.Lutris" / "cache" / "lutris",
shared.cache_dir / "lutris",
)
)
cache_dir = check_install(cache_check, cache_locations, (shared.schema, cache_key))
return cache_dir
def lutris_importer():
lutris_dir = lutris_installed()
if not lutris_dir:
return
cache_dir = lutris_cache_exists()
if not cache_dir:
return
db_cache_dir = shared.cache_dir / "cartridges" / "lutris"
db_cache_dir.mkdir(parents=True, exist_ok=True)
# Copy the file because sqlite3 doesn't like databases in /run/user/
database_tmp_path = db_cache_dir / "pga.db"
for db_file in lutris_dir.glob("pga.db*"):
copyfile(db_file, (db_cache_dir / db_file.name))
db_request = """
SELECT
id, name, slug, runner, hidden
FROM
'games'
WHERE
name IS NOT NULL
AND slug IS NOT NULL
AND configPath IS NOT NULL
AND installed IS TRUE
;
"""
connection = connect(database_tmp_path)
cursor = connection.execute(db_request)
rows = cursor.fetchall()
connection.close()
# No need to unlink temp files as they disappear when the connection is closed
database_tmp_path.unlink(missing_ok=True)
if not shared.schema.get_boolean("lutris-import-steam"):
rows = [row for row in rows if not row[3] == "steam"]
current_time = int(time())
importer = shared.importer
importer.total_queue += len(rows)
importer.queue += len(rows)
for row in rows:
values = {}
values["game_id"] = f"lutris_{row[3]}_{row[0]}"
if (
values["game_id"] in shared.win.games
and not shared.win.games[values["game_id"]].removed
):
importer.save_game()
continue
values["added"] = current_time
values["executable"] = ["xdg-open", f"lutris:rungameid/{row[0]}"]
values["hidden"] = row[4] == 1
values["name"] = row[1]
values["source"] = f"lutris_{row[3]}"
image_path = cache_dir / "coverart" / f"{row[2]}.jpg"
importer.save_game(values, image_path if image_path.is_file() else None)

View File

@@ -1,177 +0,0 @@
# steam_importer.py
#
# Copyright 2022-2023 kramo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# SPDX-License-Identifier: GPL-3.0-or-later
import os
import re
from pathlib import Path
from time import time
import requests
from gi.repository import Gio
from src import shared
from src.utils.check_install import check_install
def update_values_from_data(content, values):
basic_data = content[values["appid"]]
if not basic_data["success"]:
values["blacklisted"] = True
else:
data = basic_data["data"]
if data.get("developers"):
values["developer"] = ", ".join(data["developers"])
if data.get("type") not in {"game", "demo"}:
values["blacklisted"] = True
return values
def get_game(task, datatypes, current_time, appmanifest, steam_dir):
values = {}
data = appmanifest.read_text("utf-8")
for datatype in datatypes:
value = re.findall(f'"{datatype}"\t\t"(.*)"\n', data, re.IGNORECASE)
try:
values[datatype] = value[0]
except IndexError:
task.return_value((None, None))
return
values["game_id"] = f'steam_{values["appid"]}'
if (
values["game_id"] in shared.win.games
and not shared.win.games[values["game_id"]].removed
):
task.return_value((None, None))
return
values["executable"] = (
["start", f'steam://rungameid/{values["appid"]}']
if os.name == "nt"
else ["xdg-open", f'steam://rungameid/{values["appid"]}']
)
values["hidden"] = False
values["source"] = "steam"
values["added"] = current_time
image_path = (
steam_dir
/ "appcache"
/ "librarycache"
/ f'{values["appid"]}_library_600x900.jpg'
)
try:
with requests.get(
f'https://store.steampowered.com/api/appdetails?appids={values["appid"]}',
timeout=5,
) as open_file:
open_file.raise_for_status()
content = open_file.json()
except requests.exceptions.RequestException:
task.return_value((values, image_path if image_path.is_file() else None))
return
values = update_values_from_data(content, values)
task.return_value((values, image_path if image_path.is_file() else None))
def get_games_async(appmanifests, steam_dir, importer):
datatypes = ["appid", "name"]
current_time = int(time())
# Wrap the function in another one as Gio.Task.run_in_thread does not allow for passing args
def create_func(datatypes, current_time, appmanifest, steam_dir):
def wrapper(task, *_args):
get_game(
task,
datatypes,
current_time,
appmanifest,
steam_dir,
)
return wrapper
def update_games(_task, result):
final_values = result.propagate_value()[1]
# No need for an if statement as final_value would be None for games we don't want to save
importer.save_game(final_values[0], final_values[1])
for appmanifest in appmanifests:
task = Gio.Task.new(None, None, update_games)
task.run_in_thread(create_func(datatypes, current_time, appmanifest, steam_dir))
def steam_installed(path=None):
location_key = "steam-location"
check = "steamapps"
subdirs = ("steam", "Steam")
locations = (
(path,)
if path
else (
Path(shared.schema.get_string(location_key)).expanduser(),
Path.home() / ".steam",
shared.data_dir / "Steam",
Path.home() / ".var/app/com.valvesoftware.Steam/data/Steam",
)
)
if os.name == "nt":
locations += (Path(os.getenv("programfiles(x86)")) / "Steam",)
steam_dir = check_install(check, locations, (shared.schema, location_key), subdirs)
return steam_dir
def steam_importer():
steam_dir = steam_installed()
if not steam_dir:
return
appmanifests = []
if (lib_file := steam_dir / "steamapps" / "libraryfolders.vdf").is_file():
libraryfolders = lib_file.open().read()
steam_dirs = [
Path(path) for path in re.findall('"path"\t\t"(.*)"\n', libraryfolders)
]
else:
steam_dirs = [steam_dir]
for directory in steam_dirs:
try:
for open_file in (directory / "steamapps").iterdir():
if open_file.is_file() and "appmanifest" in open_file.name:
appmanifests.append(open_file)
except FileNotFoundError:
continue
importer = shared.importer
importer.total_queue += len(appmanifests)
importer.queue += len(appmanifests)
get_games_async(appmanifests, steam_dir, importer)

View File

@@ -9,7 +9,6 @@ configure_file(
) )
install_subdir('importer', install_dir: moduledir) install_subdir('importer', install_dir: moduledir)
install_subdir('importers', install_dir: moduledir)
install_subdir('utils', install_dir: moduledir) install_subdir('utils', install_dir: moduledir)
install_subdir('store', install_dir: moduledir) install_subdir('store', install_dir: moduledir)
install_subdir('logging', install_dir: moduledir) install_subdir('logging', install_dir: moduledir)

View File

@@ -80,12 +80,6 @@ class SteamRateLimiter(RateLimiter):
self.pick_history.remove_old_entries() self.pick_history.remove_old_entries()
super().__init__() super().__init__()
@property
def refill_spacing(self) -> float:
spacing = super().refill_spacing
logging.debug("Next Steam API request token in %f seconds", spacing)
return spacing
def acquire(self): def acquire(self):
"""Get a token from the bucket and store the pick history in the schema""" """Get a token from the bucket and store the pick history in the schema"""
super().acquire() super().acquire()