Files
cartridges/src/importers/itch_importer.py
2023-05-24 15:35:18 +02:00

192 lines
5.6 KiB
Python

# itch_importer.py
#
# Copyright 2022-2023 kramo
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# SPDX-License-Identifier: GPL-3.0-or-later
import os
from pathlib import Path
from shutil import copyfile
from sqlite3 import connect
from time import time
import requests
from gi.repository import GdkPixbuf, Gio
import src.shared
from src.utils.check_install import check_install
from src.utils.save_cover import resize_cover
def get_game(task, current_time, row):
values = {}
values["game_id"] = f"itch_{row[0]}"
if (
values["game_id"] in shared.win.games
and not shared.win.games[values["game_id"]].removed
):
task.return_value((None, None))
return
values["added"] = current_time
values["executable"] = (
["start", f"itch://caves/{row[4]}/launch"]
if os.name == "nt"
else ["xdg-open", f"itch://caves/{row[4]}/launch"]
)
values["hidden"] = False
values["name"] = row[1]
values["source"] = "itch"
if row[3] or row[2]:
tmp_file = Gio.File.new_tmp()[0]
try:
with requests.get(row[3] or row[2], timeout=5) as cover:
cover.raise_for_status()
Path(tmp_file.get_path()).write_bytes(cover.content)
except requests.exceptions.RequestException:
task.return_value((values, None))
return
game_cover = GdkPixbuf.Pixbuf.new_from_stream_at_scale(
tmp_file.read(), 2, 2, False
).scale_simple(*shared.image_size, GdkPixbuf.InterpType.BILINEAR)
itch_pixbuf = GdkPixbuf.Pixbuf.new_from_stream(tmp_file.read())
itch_pixbuf = itch_pixbuf.scale_simple(
shared.image_size[0],
itch_pixbuf.get_height() * (shared.image_size[0] / itch_pixbuf.get_width()),
GdkPixbuf.InterpType.BILINEAR,
)
itch_pixbuf.composite(
game_cover,
0,
(shared.image_size[1] - itch_pixbuf.get_height()) / 2,
itch_pixbuf.get_width(),
itch_pixbuf.get_height(),
0,
(shared.image_size[1] - itch_pixbuf.get_height()) / 2,
1.0,
1.0,
GdkPixbuf.InterpType.BILINEAR,
255,
)
else:
game_cover = None
task.return_value((values, game_cover))
def get_games_async(rows, importer):
current_time = int(time())
# Wrap the function in another one as Gio.Task.run_in_thread does not allow for passing args
def create_func(current_time, row):
def wrapper(task, *_args):
get_game(
task,
current_time,
row,
)
return wrapper
def update_games(_task, result):
final_values = result.propagate_value()[1]
# No need for an if statement as final_value would be None for games we don't want to save
importer.save_game(
final_values[0],
resize_cover(pixbuf=final_values[1]),
)
for row in rows:
task = Gio.Task.new(None, None, update_games)
task.run_in_thread(create_func(current_time, row))
def itch_installed(path=None):
location_key = "itch-location"
itch_dir = (
path if path else Path(shared.schema.get_string(location_key)).expanduser()
)
check = Path("db") / "butler.db"
if not (itch_dir / check).is_file():
locations = (
(Path(),)
if path
else (
Path.home() / ".var/app/io.itch.itch/config/itch",
shared.config_dir / "itch",
)
)
if os.name == "nt" and not path:
locations += (Path(os.getenv("appdata")) / "itch",)
itch_dir = check_install(check, locations, (shared.schema, location_key))
return itch_dir
def itch_importer():
itch_dir = itch_installed()
if not itch_dir:
return
database_path = (itch_dir / "db").expanduser()
db_cache_dir = shared.cache_dir / "cartridges" / "itch"
db_cache_dir.mkdir(parents=True, exist_ok=True)
# Copy the file because sqlite3 doesn't like databases in /run/user/
database_tmp_path = db_cache_dir / "butler.db"
for db_file in database_path.glob("butler.db*"):
copyfile(db_file, (db_cache_dir / db_file.name))
db_request = """
SELECT
games.id,
games.title,
games.cover_url,
games.still_cover_url,
caves.id
FROM
'caves'
INNER JOIN
'games'
ON
caves.game_id = games.id
;
"""
connection = connect(database_tmp_path)
cursor = connection.execute(db_request)
rows = cursor.fetchall()
connection.close()
# No need to unlink temp files as they disappear when the connection is closed
database_tmp_path.unlink(missing_ok=True)
importer = shared.importer
importer.total_queue += len(rows)
importer.queue += len(rows)
get_games_async(rows, importer)