import os import sys import tarfile import shutil import tempfile import subprocess import urllib.request from urllib.parse import urlsplit, urlunsplit from logs import info, warn, error from paths import TMP_DIR, CACHE_DIR, REPOS_DIR from pathlib import Path from uzbekdb import database from database import ( download_repos, get_repos_urls, install_package, remove_package, is_installed, ) _NAME = os.path.basename(sys.argv[0]) def _random_tmp_dir(tmp_dir=TMP_DIR): return tempfile.mkdtemp(prefix="qulay_", dir=tmp_dir) def _download(url, dest): try: urllib.request.urlretrieve(url, dest) except Exception as e: return e else: return 1 return 0 def _extract(archive, dest): with tarfile.open(archive) as tar: tar.extractall(dest) def _read_depends(path): depends_file = os.path.join(path, "depends.uz") if not os.path.exists(depends_file): return [] with open(depends_file) as f: return [line.strip() for line in f if line.strip()] def _check_pkg_structure(path): required = ["install.sh", "remove.sh"] for file in required: if not os.path.exists(os.path.join(path, file)): error("!! package is corrupted", True) def _get_packages(dest_dir="/"): found = [] packages_path = os.path.join(dest_dir, REPOS_DIR.lstrip(os.sep)) for repo_name in os.listdir(packages_path): repo_path = os.path.join(packages_path, repo_name) if os.path.isdir(repo_path): pkgs_file = os.path.join(repo_path, "pkgs.uz") if os.path.exists(pkgs_file): try: with open(pkgs_file, "r", encoding="utf-8") as f: pkgs_data = database.loads(f.read()) for name, data in pkgs_data.items(): found.append({ "name": name, "repo": repo_name, "data": data }) except Exception: continue return found def _find_package(name, dest_dir="/"): found = [] packages_path = os.path.join(dest_dir, REPOS_DIR.lstrip(os.sep)) if not os.path.exists(packages_path): return "-n" for repo_name in os.listdir(packages_path): repo_path = os.path.join(packages_path, repo_name) if os.path.isdir(repo_path): pkgs_file = os.path.join(repo_path, "pkgs.uz") if os.path.exists(pkgs_file): try: with open(pkgs_file, "r", encoding="utf-8") as f: pkgs_data = database.loads(f.read()) if isinstance(pkgs_data, dict) and name in pkgs_data: found.append({ "name": name, "repo": repo_name, "data": pkgs_data[name] }) except Exception: continue if not found: return None # if len(found) > 1: # return "-r" return found def _find_repo_package(result, repo, name): for pkg in result: if pkg["repo"] == repo: return pkg return None def install(name, args=[], dest_dir="/"): # verbose=False, ask=False, reinstall=False, disable_logs=False, disable_deps=False verbose = ":v" in args ask = ":a" in args reinstall = ":r" in args disable_logs = ":nl" in args disable_deps = ":nd" in args if "/" in name: parts = name.split("/", 1) repo_name = parts[0] name = parts[1] pkg = _find_repo_package(_find_package(name), repo_name, name) if not pkg: error(f"!! package {name} not found in {repo_name}", True, disable_logs) return 1 pkg_data = pkg["data"] version = pkg_data[1] else: result = _find_package(name) if not result: error(f"!! package {name} not found", True, disable_logs) return 1 if len(result) > 1: error(f"!! {len(result)} same pkgs found in different repos:", True, disable_logs) for pkg in result: print(f" {pkg['repo']}/{pkg['name']} - {pkg['data'][0]} - {pkg['data'][1]}") return 1 repo_name = result[0]["repo"] pkg_data = result[0]["data"] version = pkg_data[1] pkg_path = os.path.join(dest_dir, REPOS_DIR.lstrip(os.sep), f"{repo_name}/packages/{name}") if ask: download_answer = input( f"-> confirm installing '{name}' with version {version} [Y/n]: " ).strip().lower() if download_answer not in ("y", "yes", ""): sys.exit(0) # -- install depends -- depends = _read_depends(pkg_path) if not disable_deps: for dep in depends: if not is_installed(dep) or reinstall: # -- copy args to dep install -- install(dep, args, dest_dir=dest_dir) # -- run install.sh -- try: subprocess.run( ["sh", "install.sh"], cwd=pkg_path, check=True, env=os.environ ) except Exception as e: error(f"!! {e}", True, disable_logs) return 0 if not disable_logs: install_package(f"{repo_name}/{name}", version, dest_dir=dest_dir) info(f":: {name} installed successfully", True, disable_logs) def remove(name, args=[], dest_dir="/"): verbose = ":v" in args ask = ":a" in args disable_logs = ":nl" in args # disable_deps = "/nd" in args if "/" in name: parts = name.split("/", 1) repo_name = parts[0] name = parts[1] pkg = _find_repo_package(_find_package(name), repo_name, name) if not pkg: error(f"!! package {name} not found in {repo_name}", True, disable_logs) return 1 pkg_data = pkg["data"] version = pkg_data[1] else: result = _find_package(name) if not result: error(f"!! package {name} not found", True, disable_logs) return 1 if len(result) > 1: error(f"!! {len(result)} same pkgs found in different repos:", True, disable_logs) for pkg in result: print(f" {pkg['repo']}/{pkg['name']} - {pkg['data'][0]} - {pkg['data'][1]}") return 1 repo_name = result[0]["repo"] pkg_data = result[0]["data"] version = pkg_data[1] # -- super uzbek package remover da -- pkg_path = os.path.join(dest_dir, REPOS_DIR.lstrip(os.sep), f"{repo_name}/packages/{name}") try: subprocess.run( ["sh", "remove.sh"], cwd=pkg_path, check=True, env=os.environ ) except Exception as e: error(f"!! {e}", True, disable_logs) return 0 if not disable_logs: remove_package(f"{repo_name}/{name}", dest_dir=dest_dir) info(f":: {name} removed successfully", True, disable_logs)