mirror of
https://github.com/nikdoof/dotfiles.git
synced 2026-01-30 02:48:15 +00:00
[bin] Add colour support to stowage
This commit is contained in:
@@ -34,17 +34,98 @@ import shutil
|
|||||||
import sys
|
import sys
|
||||||
from collections.abc import Callable
|
from collections.abc import Callable
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import List
|
from typing import IO, List
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class Colours(object):
|
||||||
|
"""ANSI color codes for terminal output"""
|
||||||
|
|
||||||
|
# Reset
|
||||||
|
RESET = "\033[0m"
|
||||||
|
|
||||||
|
# Regular colors
|
||||||
|
BLACK = "\033[30m"
|
||||||
|
RED = "\033[31m"
|
||||||
|
GREEN = "\033[32m"
|
||||||
|
YELLOW = "\033[33m"
|
||||||
|
BLUE = "\033[34m"
|
||||||
|
MAGENTA = "\033[35m"
|
||||||
|
CYAN = "\033[36m"
|
||||||
|
WHITE = "\033[37m"
|
||||||
|
|
||||||
|
# Copied from _colorize in Python stdlib
|
||||||
|
def can_colorize(*, file: IO[str] | IO[bytes] | None = None) -> bool:
|
||||||
|
def _safe_getenv(k: str, fallback: str | None = None) -> str | None:
|
||||||
|
try:
|
||||||
|
return os.environ.get(k, fallback)
|
||||||
|
except Exception:
|
||||||
|
return fallback
|
||||||
|
|
||||||
|
if file is None:
|
||||||
|
file = sys.stdout
|
||||||
|
|
||||||
|
if not sys.flags.ignore_environment:
|
||||||
|
if _safe_getenv("PYTHON_COLORS") == "0":
|
||||||
|
return False
|
||||||
|
if _safe_getenv("PYTHON_COLORS") == "1":
|
||||||
|
return True
|
||||||
|
if _safe_getenv("NO_COLOR"):
|
||||||
|
return False
|
||||||
|
if _safe_getenv("FORCE_COLOR"):
|
||||||
|
return True
|
||||||
|
if _safe_getenv("TERM") == "dumb":
|
||||||
|
return False
|
||||||
|
|
||||||
|
if not hasattr(file, "fileno"):
|
||||||
|
return False
|
||||||
|
|
||||||
|
if sys.platform == "win32":
|
||||||
|
try:
|
||||||
|
import nt
|
||||||
|
|
||||||
|
if not nt._supports_virtual_terminal():
|
||||||
|
return False
|
||||||
|
except (ImportError, AttributeError):
|
||||||
|
return False
|
||||||
|
|
||||||
|
try:
|
||||||
|
return os.isatty(file.fileno())
|
||||||
|
except OSError:
|
||||||
|
return hasattr(file, "isatty") and file.isatty()
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def by_name(name):
|
||||||
|
if not Colours.can_colorize():
|
||||||
|
return ""
|
||||||
|
return getattr(Colours, name.upper(), "")
|
||||||
|
|
||||||
|
|
||||||
|
# Colours to use for each action
|
||||||
|
ACTION_COLOURS = {
|
||||||
|
"LINK": "GREEN",
|
||||||
|
"UNLINK": "RED",
|
||||||
|
"DIR": "GREEN",
|
||||||
|
"RMDIR": "RED",
|
||||||
|
"SKIP": "YELLOW",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def print_action(action: str, msg: str) -> None:
|
||||||
|
"""Print an action message."""
|
||||||
|
colour = ""
|
||||||
|
if action in ACTION_COLOURS:
|
||||||
|
colour = Colours.by_name(ACTION_COLOURS[action])
|
||||||
|
print(f"{colour}{action}{Colours.by_name('RESET')} {msg}")
|
||||||
|
|
||||||
|
|
||||||
def add_file_to_package(
|
def add_file_to_package(
|
||||||
file_path: Path, package: str, args: argparse.Namespace
|
file_path: Path, package_name: str, args: argparse.Namespace
|
||||||
) -> bool:
|
) -> bool:
|
||||||
"""Add a file to a package by moving it and creating a symlink."""
|
"""Add a file to a package by moving it and creating a symlink."""
|
||||||
target = args.target.resolve()
|
target = args.target.resolve()
|
||||||
package = Path(args.repository, package).resolve()
|
package = Path(args.repository, package_name).resolve()
|
||||||
|
|
||||||
# Check the file is under the target directory
|
# Check the file is under the target directory
|
||||||
if not file_path.is_relative_to(target):
|
if not file_path.is_relative_to(target):
|
||||||
@@ -62,11 +143,11 @@ def add_file_to_package(
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
if not dest_dir.exists():
|
if not dest_dir.exists():
|
||||||
logger.info("DIR %s", dest_dir)
|
print_action("DIR", str(dest_dir))
|
||||||
if not args.dry_run:
|
if not args.dry_run:
|
||||||
dest_dir.mkdir(parents=True, mode=0o755, exist_ok=True)
|
dest_dir.mkdir(parents=True, mode=0o755, exist_ok=True)
|
||||||
|
|
||||||
logger.info("SWAP %s <-> %s", dest_path, file_path)
|
print_action("SWAP", f"{dest_path} <-> {file_path}")
|
||||||
if not args.dry_run:
|
if not args.dry_run:
|
||||||
shutil.move(str(file_path), str(dest_path))
|
shutil.move(str(file_path), str(dest_path))
|
||||||
try:
|
try:
|
||||||
@@ -106,13 +187,13 @@ def install_package(
|
|||||||
if rest != Path("."):
|
if rest != Path("."):
|
||||||
# If a non-directory exists with the same name and clobber is enabled, get rid of it.
|
# If a non-directory exists with the same name and clobber is enabled, get rid of it.
|
||||||
if dest.exists() and not dest.is_dir() and args.clobber:
|
if dest.exists() and not dest.is_dir() and args.clobber:
|
||||||
logger.info("UNLINK %s", dest)
|
print_action("UNLINK", dest)
|
||||||
if not args.dry_run:
|
if not args.dry_run:
|
||||||
dest.unlink()
|
dest.unlink()
|
||||||
|
|
||||||
# Make directory
|
# Make directory
|
||||||
if not dest.exists():
|
if not dest.exists():
|
||||||
logger.info("DIR %s", dest)
|
print_action("DIR", dest)
|
||||||
if not args.dry_run:
|
if not args.dry_run:
|
||||||
dest.mkdir(parents=True, mode=0o755, exist_ok=True)
|
dest.mkdir(parents=True, mode=0o755, exist_ok=True)
|
||||||
|
|
||||||
@@ -123,17 +204,17 @@ def install_package(
|
|||||||
|
|
||||||
# Skip if the file exists and we're not clobbering
|
# Skip if the file exists and we're not clobbering
|
||||||
if dest_path.exists() and not args.clobber:
|
if dest_path.exists() and not args.clobber:
|
||||||
logger.info("SKIP %s", dest_path)
|
print_action("SKIP", dest_path)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Does the file already exist?
|
# Does the file already exist?
|
||||||
if dest_path.is_file() or dest_path.is_symlink():
|
if dest_path.is_file() or dest_path.is_symlink():
|
||||||
logger.info("UNLINK %s", dest_path)
|
print_action("UNLINK", dest_path)
|
||||||
if not args.dry_run:
|
if not args.dry_run:
|
||||||
dest_path.unlink()
|
dest_path.unlink()
|
||||||
|
|
||||||
# Link the file
|
# Link the file
|
||||||
logger.info("LINK %s -> %s", src_path, dest_path)
|
print_action("LINK", f"%{src_path} -> %{dest_path}")
|
||||||
if not args.dry_run:
|
if not args.dry_run:
|
||||||
try:
|
try:
|
||||||
dest_path.symlink_to(src_path)
|
dest_path.symlink_to(src_path)
|
||||||
@@ -176,22 +257,22 @@ def uninstall_package(
|
|||||||
src_path = (root_path / filename).resolve()
|
src_path = (root_path / filename).resolve()
|
||||||
try:
|
try:
|
||||||
if dest_path.resolve() == src_path:
|
if dest_path.resolve() == src_path:
|
||||||
logger.info("UNLINK %s", dest_path)
|
print_action("UNLINK", dest_path)
|
||||||
if not args.dry_run:
|
if not args.dry_run:
|
||||||
dest_path.unlink()
|
dest_path.unlink()
|
||||||
else:
|
else:
|
||||||
logger.info("SKIP %s (points elsewhere)", dest_path)
|
print_action("SKIP", f"{dest_path} (points elsewhere)")
|
||||||
except (OSError, RuntimeError) as e:
|
except (OSError, RuntimeError) as e:
|
||||||
logger.warning("error checking symlink %s: %s", dest_path, e)
|
logger.warning("error checking symlink %s: %s", dest_path, e)
|
||||||
else:
|
else:
|
||||||
logger.info("SKIP %s (not a symlink)", dest_path)
|
print_action("SKIP", f"{dest_path} (not a symlink)")
|
||||||
|
|
||||||
# Delete the directories if empty.
|
# Delete the directories if empty.
|
||||||
for dir_path in sorted(dirs, key=lambda p: len(str(p)), reverse=True):
|
for dir_path in sorted(dirs, key=lambda p: len(str(p)), reverse=True):
|
||||||
if not dir_path.exists():
|
if not dir_path.exists():
|
||||||
continue
|
continue
|
||||||
try:
|
try:
|
||||||
logger.info("RMDIR %s", dir_path)
|
print_action("RMDIR", str(dir_path))
|
||||||
if not args.dry_run:
|
if not args.dry_run:
|
||||||
dir_path.rmdir()
|
dir_path.rmdir()
|
||||||
except OSError:
|
except OSError:
|
||||||
@@ -253,13 +334,17 @@ def cleanup_package(package: str, args: argparse.Namespace) -> None:
|
|||||||
rest = root_path.relative_to(package_dir)
|
rest = root_path.relative_to(package_dir)
|
||||||
dest = args.target / rest
|
dest = args.target / rest
|
||||||
|
|
||||||
|
# If the folder doesn't exist in the target, skip it
|
||||||
|
if not dest.exists():
|
||||||
|
continue
|
||||||
|
|
||||||
# Iterate the files in the target folder
|
# Iterate the files in the target folder
|
||||||
for file in os.listdir(dest):
|
for file in os.listdir(dest):
|
||||||
src_path = dest / file
|
src_path = dest / file
|
||||||
if is_package_file(
|
if is_package_file(
|
||||||
src_path, package, args.repository
|
src_path, package, args.repository
|
||||||
) and is_broken_symlink(src_path):
|
) and is_broken_symlink(src_path):
|
||||||
logger.info("UNLINK %s", src_path)
|
print_action("UNLINK", src_path)
|
||||||
if not args.dry_run:
|
if not args.dry_run:
|
||||||
src_path.unlink()
|
src_path.unlink()
|
||||||
|
|
||||||
@@ -376,7 +461,10 @@ def main() -> None:
|
|||||||
if len(packages):
|
if len(packages):
|
||||||
print(f"Packages in repository: {repo_path}")
|
print(f"Packages in repository: {repo_path}")
|
||||||
for package in packages:
|
for package in packages:
|
||||||
print(f"- {package}")
|
print(
|
||||||
|
f"{Colours.by_name('GREEN')}-{Colours.by_name('RESET')} {package}"
|
||||||
|
)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
logger.info("no packages found in repository")
|
logger.info("no packages found in repository")
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user