import os import sys import dbm import ssl import shutil import shelve import urllib.request import tarfile import compression.zstd as zstd from paths import * from uzbekdb import database import urllib.request import urllib.error from urllib.parse import urlsplit, urlunsplit, urlparse TIMEOUT = 10 _NAME = os.path.basename(sys.argv[0]) def get_file_repos(): if not os.path.exists(REPOS_FILE): print(f"!! can't get repos database.\n" f"|- please init Qulay PM with '{_NAME} i'\n" f"|- and update repos with '{_NAME} u'") return {} try: with shelve.open(REPOS_FILE, flag="r") as db: return db["repos"] or {} except Exception: print(f"!! can't get repos database.\n" f"|- please init Qulay PM with '{_NAME} i'\n" f"|- and update repos with '{_NAME} u'") return {} def get_repos_urls(): if not os.path.exists(REPOS_URLS_FILE): return [] with open(REPOS_URLS_FILE, "r") as f: try: return f.readlines() except: return [] def download_repos(urls, dest_dir="/"): valid_urls = [u.strip() for u in urls if u.strip()] total = len(valid_urls) for idx, url in enumerate(valid_urls, 1): file_name = os.path.basename(urlparse(url).path) repo_name = file_name.removesuffix(".tar.zst") try: bar_len = 20 print(f"\r:: {repo_name:<10} [{'#' * 0}{'-' * bar_len}] {idx-1}/{total}", end="", flush=True) with urllib.request.urlopen(url, timeout=10) as response: with open(f"/tmp/{file_name}", "wb") as f: shutil.copyfileobj(response, f) print(f"\r:: {repo_name:<10} [{'#' * 10}{'-' * 10}] {idx-1}/{total}", end="", flush=True) repos_dir = os.path.join(dest_dir, REPOS_DIR.lstrip(os.sep)) with zstd.open(f"/tmp/{file_name}", mode='rb') as zf: with tarfile.open(fileobj=zf, mode='r|') as tar: tar.extractall(path=repos_dir) except Exception as e: print(f"\r:: {repo_name:<10} fail [{'x' * 20}] {idx}/{total}") else: print(f"\r:: {repo_name:<10} done [{'#' * 20}] {idx}/{total}") finally: if os.path.exists(f"/tmp/{file_name}"): os.remove(f"/tmp/{file_name}") def _read_installed(dest_dir="/"): installed_full_path = os.path.join(dest_dir, INSTALLED_FILE.lstrip(os.sep)) if not os.path.exists(installed_full_path): return {} with open(installed_full_path, "r", encoding="utf-8") as f: content = f.read().strip() data = database.loads(content) if isinstance(data, list) and len(data) == 2: data = {data[0]: [data[1]]} return data if data else {} def _write_installed(data: dict, dest_dir="/"): installed_full_path = os.path.join(dest_dir, INSTALLED_FILE.lstrip(os.sep)) with open(installed_full_path, "w", encoding="utf-8") as f: f.write(database.dumps(data)) def is_installed(name, dest_dir="/"): return name in _read_installed(dest_dir) def install_package(name, version, dest_dir="/"): data = _read_installed(dest_dir) action = "added" if isinstance(data, dict): if name in data: if data[name][0] == version: action = "already installed" else: action = f"upgraded from {data[name][0]} to {version}" data[name][0] = version else: data[name] = [version] else: data = {name: [version]} _write_installed(data, dest_dir) return action def remove_package(name, dest_dir="/"): data = _read_installed(dest_dir) if not isinstance(data, dict): raise ValueError("!! installed database is corrupted") if name not in data: raise ValueError(f"!! cant remove '{name}': package not installed") del data[name] _write_installed(data, dest_dir) def get_version_package(name, dest_dir="/"): return _read_installed(dest_dir).get(name) def ensure_database(verbose, ask, dest_dir="/"): to_create = [INSTALLED_FILE, REPOS_URLS_FILE, LOG_FILE, CACHE_DIR, TMP_DIR, REPOS_DIR] if ask: for path in to_create: if path.startswith("/"): path = path.lstrip("/") path = os.path.join(dest_dir, path.lstrip(os.sep)) print(f" * {path}") answer = input("-> Do you want to create the following directories and files? [Y/n]: ").strip().lower() if not answer in ("", "y", "yes"): sys.exit(0) for name in to_create: if name.startswith("/"): name = name.lstrip("/") name_full_path = os.path.join(dest_dir, name) try: if name.endswith("/"): os.makedirs(name_full_path, exist_ok=True) if verbose: print(f"[+] dir {name_full_path}") continue os.makedirs(os.path.dirname(name_full_path), exist_ok=True) if not os.path.exists(name_full_path): open(name_full_path, "w").close() if verbose: print(f"[+] file {name_full_path}") except Exception as e: if verbose: print(f"[f] failed to create {name_full_path}: {e}")