aboutsummaryrefslogtreecommitdiff
path: root/database.py
diff options
context:
space:
mode:
Diffstat (limited to 'database.py')
-rw-r--r--database.py177
1 files changed, 177 insertions, 0 deletions
diff --git a/database.py b/database.py
new file mode 100644
index 0000000..2ebe97a
--- /dev/null
+++ b/database.py
@@ -0,0 +1,177 @@
+import os
+import sys
+import dbm
+import ssl
+import shutil
+import shelve
+import urllib.request
+import tarfile
+
+import compression.zstd as zstd
+from paths import *
+from uzbekdb import database
+import urllib.request
+import urllib.error
+from urllib.parse import urlsplit, urlunsplit, urlparse
+
+
+TIMEOUT = 10
+_NAME = os.path.basename(sys.argv[0])
+
+
+def get_file_repos():
+ if not os.path.exists(REPOS_FILE):
+ print(f"!! can't get repos database.\n"
+ f"|- please init Qulay PM with '{_NAME} i'\n"
+ f"|- and update repos with '{_NAME} u'")
+ return {}
+
+ try:
+ with shelve.open(REPOS_FILE, flag="r") as db:
+ return db["repos"] or {}
+ except Exception:
+ print(f"!! can't get repos database.\n"
+ f"|- please init Qulay PM with '{_NAME} i'\n"
+ f"|- and update repos with '{_NAME} u'")
+ return {}
+
+
+def get_repos_urls():
+ if not os.path.exists(REPOS_URLS_FILE):
+ return []
+ with open(REPOS_URLS_FILE, "r") as f:
+ try:
+ return f.readlines()
+ except:
+ return []
+
+
+def download_repos(urls, dest_dir="/"):
+ valid_urls = [u.strip() for u in urls if u.strip()]
+ total = len(valid_urls)
+
+ for idx, url in enumerate(valid_urls, 1):
+ file_name = os.path.basename(urlparse(url).path)
+ repo_name = file_name.removesuffix(".tar.zst")
+ try:
+ bar_len = 20
+
+ print(f"\r:: {repo_name:<10} [{'#' * 0}{'-' * bar_len}] {idx-1}/{total}", end="", flush=True)
+
+ with urllib.request.urlopen(url, timeout=10) as response:
+ with open(f"/tmp/{file_name}", "wb") as f:
+ shutil.copyfileobj(response, f)
+
+ print(f"\r:: {repo_name:<10} [{'#' * 10}{'-' * 10}] {idx-1}/{total}", end="", flush=True)
+
+ repos_dir = os.path.join(dest_dir, REPOS_DIR.lstrip(os.sep))
+
+ with zstd.open(f"/tmp/{file_name}", mode='rb') as zf:
+ with tarfile.open(fileobj=zf, mode='r|') as tar:
+ tar.extractall(path=repos_dir)
+ except Exception as e:
+ print(f"\r:: {repo_name:<10} fail [{'x' * 20}] {idx}/{total}")
+ else:
+ print(f"\r:: {repo_name:<10} done [{'#' * 20}] {idx}/{total}")
+ finally:
+ if os.path.exists(f"/tmp/{file_name}"):
+ os.remove(f"/tmp/{file_name}")
+
+
+def _read_installed(dest_dir="/"):
+ installed_full_path = os.path.join(dest_dir, INSTALLED_FILE.lstrip(os.sep))
+ if not os.path.exists(installed_full_path):
+ return {}
+
+ with open(installed_full_path, "r", encoding="utf-8") as f:
+ content = f.read().strip()
+ data = database.loads(content)
+ if isinstance(data, list) and len(data) == 2:
+ data = {data[0]: [data[1]]}
+ return data if data else {}
+
+
+def _write_installed(data: dict, dest_dir="/"):
+ installed_full_path = os.path.join(dest_dir, INSTALLED_FILE.lstrip(os.sep))
+ with open(installed_full_path, "w", encoding="utf-8") as f:
+ f.write(database.dumps(data))
+
+
+def is_installed(name, dest_dir="/"):
+ return name in _read_installed(dest_dir)
+
+
+def install_package(name, version, dest_dir="/"):
+ data = _read_installed(dest_dir)
+
+ action = "added"
+
+ if isinstance(data, dict):
+ if name in data:
+ if data[name][0] == version:
+ action = "already installed"
+ else:
+ action = f"upgraded from {data[name][0]} to {version}"
+ data[name][0] = version
+ else:
+ data[name] = [version]
+ else:
+ data = {name: [version]}
+
+ _write_installed(data, dest_dir)
+ return action
+
+
+def remove_package(name, dest_dir="/"):
+ data = _read_installed(dest_dir)
+
+ if not isinstance(data, dict):
+ raise ValueError("!! installed database is corrupted")
+
+ if name not in data:
+ raise ValueError(f"!! cant remove '{name}': package not installed")
+
+ del data[name]
+ _write_installed(data, dest_dir)
+
+
+def get_version_package(name, dest_dir="/"):
+ return _read_installed(dest_dir).get(name)
+
+
+def ensure_database(verbose, ask, dest_dir="/"):
+ to_create = [INSTALLED_FILE, REPOS_URLS_FILE, LOG_FILE, CACHE_DIR, TMP_DIR, REPOS_DIR]
+
+ if ask:
+ for path in to_create:
+ if path.startswith("/"):
+ path = path.lstrip("/")
+ path = os.path.join(dest_dir, path.lstrip(os.sep))
+ print(f" * {path}")
+ answer = input("-> Do you want to create the following directories and files? [Y/n]: ").strip().lower()
+ if not answer in ("", "y", "yes"):
+ sys.exit(0)
+
+
+ for name in to_create:
+ if name.startswith("/"):
+ name = name.lstrip("/")
+ name_full_path = os.path.join(dest_dir, name)
+ try:
+ if name.endswith("/"):
+ os.makedirs(name_full_path, exist_ok=True)
+ if verbose:
+ print(f"[+] dir {name_full_path}")
+ continue
+
+ os.makedirs(os.path.dirname(name_full_path), exist_ok=True)
+
+ if not os.path.exists(name_full_path):
+ open(name_full_path, "w").close()
+
+ if verbose:
+ print(f"[+] file {name_full_path}")
+
+ except Exception as e:
+ if verbose:
+ print(f"[f] failed to create {name_full_path}: {e}")