From e9046a96f1edd2d52594785998d90a14d10a5803 Mon Sep 17 00:00:00 2001 From: huker667 Date: Sun, 10 May 2026 09:51:20 +0300 Subject: init commit v0.7 --- database.py | 177 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 177 insertions(+) create mode 100644 database.py (limited to 'database.py') diff --git a/database.py b/database.py new file mode 100644 index 0000000..2ebe97a --- /dev/null +++ b/database.py @@ -0,0 +1,177 @@ +import os +import sys +import dbm +import ssl +import shutil +import shelve +import urllib.request +import tarfile + +import compression.zstd as zstd +from paths import * +from uzbekdb import database +import urllib.request +import urllib.error +from urllib.parse import urlsplit, urlunsplit, urlparse + + +TIMEOUT = 10 +_NAME = os.path.basename(sys.argv[0]) + + +def get_file_repos(): + if not os.path.exists(REPOS_FILE): + print(f"!! can't get repos database.\n" + f"|- please init Qulay PM with '{_NAME} i'\n" + f"|- and update repos with '{_NAME} u'") + return {} + + try: + with shelve.open(REPOS_FILE, flag="r") as db: + return db["repos"] or {} + except Exception: + print(f"!! can't get repos database.\n" + f"|- please init Qulay PM with '{_NAME} i'\n" + f"|- and update repos with '{_NAME} u'") + return {} + + +def get_repos_urls(): + if not os.path.exists(REPOS_URLS_FILE): + return [] + with open(REPOS_URLS_FILE, "r") as f: + try: + return f.readlines() + except: + return [] + + +def download_repos(urls, dest_dir="/"): + valid_urls = [u.strip() for u in urls if u.strip()] + total = len(valid_urls) + + for idx, url in enumerate(valid_urls, 1): + file_name = os.path.basename(urlparse(url).path) + repo_name = file_name.removesuffix(".tar.zst") + try: + bar_len = 20 + + print(f"\r:: {repo_name:<10} [{'#' * 0}{'-' * bar_len}] {idx-1}/{total}", end="", flush=True) + + with urllib.request.urlopen(url, timeout=10) as response: + with open(f"/tmp/{file_name}", "wb") as f: + shutil.copyfileobj(response, f) + + print(f"\r:: {repo_name:<10} [{'#' * 10}{'-' * 10}] {idx-1}/{total}", end="", flush=True) + + repos_dir = os.path.join(dest_dir, REPOS_DIR.lstrip(os.sep)) + + with zstd.open(f"/tmp/{file_name}", mode='rb') as zf: + with tarfile.open(fileobj=zf, mode='r|') as tar: + tar.extractall(path=repos_dir) + except Exception as e: + print(f"\r:: {repo_name:<10} fail [{'x' * 20}] {idx}/{total}") + else: + print(f"\r:: {repo_name:<10} done [{'#' * 20}] {idx}/{total}") + finally: + if os.path.exists(f"/tmp/{file_name}"): + os.remove(f"/tmp/{file_name}") + + +def _read_installed(dest_dir="/"): + installed_full_path = os.path.join(dest_dir, INSTALLED_FILE.lstrip(os.sep)) + if not os.path.exists(installed_full_path): + return {} + + with open(installed_full_path, "r", encoding="utf-8") as f: + content = f.read().strip() + data = database.loads(content) + if isinstance(data, list) and len(data) == 2: + data = {data[0]: [data[1]]} + return data if data else {} + + +def _write_installed(data: dict, dest_dir="/"): + installed_full_path = os.path.join(dest_dir, INSTALLED_FILE.lstrip(os.sep)) + with open(installed_full_path, "w", encoding="utf-8") as f: + f.write(database.dumps(data)) + + +def is_installed(name, dest_dir="/"): + return name in _read_installed(dest_dir) + + +def install_package(name, version, dest_dir="/"): + data = _read_installed(dest_dir) + + action = "added" + + if isinstance(data, dict): + if name in data: + if data[name][0] == version: + action = "already installed" + else: + action = f"upgraded from {data[name][0]} to {version}" + data[name][0] = version + else: + data[name] = [version] + else: + data = {name: [version]} + + _write_installed(data, dest_dir) + return action + + +def remove_package(name, dest_dir="/"): + data = _read_installed(dest_dir) + + if not isinstance(data, dict): + raise ValueError("!! installed database is corrupted") + + if name not in data: + raise ValueError(f"!! cant remove '{name}': package not installed") + + del data[name] + _write_installed(data, dest_dir) + + +def get_version_package(name, dest_dir="/"): + return _read_installed(dest_dir).get(name) + + +def ensure_database(verbose, ask, dest_dir="/"): + to_create = [INSTALLED_FILE, REPOS_URLS_FILE, LOG_FILE, CACHE_DIR, TMP_DIR, REPOS_DIR] + + if ask: + for path in to_create: + if path.startswith("/"): + path = path.lstrip("/") + path = os.path.join(dest_dir, path.lstrip(os.sep)) + print(f" * {path}") + answer = input("-> Do you want to create the following directories and files? [Y/n]: ").strip().lower() + if not answer in ("", "y", "yes"): + sys.exit(0) + + + for name in to_create: + if name.startswith("/"): + name = name.lstrip("/") + name_full_path = os.path.join(dest_dir, name) + try: + if name.endswith("/"): + os.makedirs(name_full_path, exist_ok=True) + if verbose: + print(f"[+] dir {name_full_path}") + continue + + os.makedirs(os.path.dirname(name_full_path), exist_ok=True) + + if not os.path.exists(name_full_path): + open(name_full_path, "w").close() + + if verbose: + print(f"[+] file {name_full_path}") + + except Exception as e: + if verbose: + print(f"[f] failed to create {name_full_path}: {e}") -- cgit v1.3.1