From 53dc8a090c5e194ccf6cd4ec59fd30a65d7e1ee2 Mon Sep 17 00:00:00 2001 From: Andrew Williams Date: Thu, 8 Jan 2026 12:22:53 +0000 Subject: [PATCH] [bin] Improvements to stowage --- bin/.local/bin/stowage | 360 ++++++++++++++++++++++++++--------------- 1 file changed, 231 insertions(+), 129 deletions(-) diff --git a/bin/.local/bin/stowage b/bin/.local/bin/stowage index 2983ac9..80ee065 100755 --- a/bin/.local/bin/stowage +++ b/bin/.local/bin/stowage @@ -6,7 +6,7 @@ modified by Andrew Williams A dotfile package manager Copyright (c) Keith Gaughan, 2017. -Copyright (c) Andrew Williams, 2021. +Copyright (c) Andrew Williams, 2021-2025. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in @@ -32,147 +32,159 @@ import os import re import shutil import sys +from collections.abc import Callable from pathlib import Path -from typing import Callable, List +from typing import List logger = logging.getLogger(__name__) -def add(args: argparse.Namespace) -> None: +def add_file_to_package( + file_path: Path, package: str, args: argparse.Namespace +) -> bool: """Add a file to a package by moving it and creating a symlink.""" - target = Path(args.target).resolve() - file_path = Path(args.file).resolve() - package = Path(args.repository, args.packages[0]).resolve() + target = args.target.resolve() + package = Path(args.repository, package).resolve() + # Check the file is under the target directory if not file_path.is_relative_to(target): - logger.error("'%s' not under '%s'", args.file, args.target) - sys.exit(1) + logger.error("'%s' not under '%s'", file_path, args.target) + return False + # Calculate the relative path in the package folder, and final destination + # e.g. /home/bin/x -> /home/.dotfiles/package/bin/x rest = file_path.relative_to(target) dest_path = package / rest dest_dir = dest_path.parent if dest_path.exists(): logger.error("file already exists in package: %s", dest_path) - sys.exit(1) + return False if not dest_dir.exists(): logger.info("DIR %s", dest_dir) if not args.dry_run: dest_dir.mkdir(parents=True, mode=0o755, exist_ok=True) - logger.info("SWAP %s %s", dest_path, file_path) + logger.info("SWAP %s <-> %s", dest_path, file_path) if not args.dry_run: shutil.move(str(file_path), str(dest_path)) try: file_path.symlink_to(dest_path) except OSError as e: logger.error("failed to create symlink: %s", e) - # Attempt to restore the file + # Attempt to restore the file if symlink creation fails try: shutil.move(str(dest_path), str(file_path)) except Exception as restore_error: logger.error("failed to restore file: %s", restore_error) - sys.exit(1) + return False + + return True -def install(args: argparse.Namespace, is_excluded: Callable[[str], bool]) -> None: - """Install packages by creating symlinks from repository to target.""" - for package in args.packages: - package_dir = Path(args.repository, package) - if not package_dir.is_dir(): - logger.warning("no such package: %s; skipping", package) +def install_package( + package: str, args: argparse.Namespace, is_excluded: Callable[[str], bool] +) -> bool: + """Install a package by creating symlinks from repository to target.""" + package_dir = args.repository / package + if not package_dir.is_dir(): + logger.warning("no such package: %s; skipping", package) + return False + + # Walk the package + for root, _, files in os.walk(package_dir, followlinks=True): + root_path = Path(root) + files = [filename for filename in files if not is_excluded(filename)] + if not files: continue - # Walk the package - for root, _, files in os.walk(package_dir, followlinks=True): - root_path = Path(root) - files = [filename for filename in files if not is_excluded(filename)] - if not files: - continue + rest = root_path.relative_to(package_dir) + dest = args.target / rest - rest = root_path.relative_to(package_dir) - dest = Path(args.target) / rest - - # Create the directory path - if rest != Path("."): - # If a non-directory exists with the same name and clobber is enabled, get rid of it. - if dest.exists() and not dest.is_dir() and args.clobber: - logger.info("UNLINK %s", dest) - if not args.dry_run: - dest.unlink() - - # Make directory - if not dest.exists(): - logger.info("DIR %s", dest) - if not args.dry_run: - dest.mkdir(parents=True, mode=0o755, exist_ok=True) - - # Process files - for filename in files: - src_path = (root_path / filename).resolve() - dest_path = dest / filename - - # Skip if the file exists and we're not clobbering - if dest_path.exists() and not args.clobber: - logger.info("SKIP %s", dest_path) - continue - - # Does the file already exist? - if dest_path.is_file() or dest_path.is_symlink(): - logger.info("UNLINK %s", dest_path) - if not args.dry_run: - dest_path.unlink() - - # Link the file - logger.info("LINK %s %s", src_path, dest_path) + # Create the directory path + if rest != Path("."): + # If a non-directory exists with the same name and clobber is enabled, get rid of it. + if dest.exists() and not dest.is_dir() and args.clobber: + logger.info("UNLINK %s", dest) if not args.dry_run: - try: - dest_path.symlink_to(src_path) - except OSError as e: - logger.error("failed to create symlink %s: %s", dest_path, e) + dest.unlink() + # Make directory + if not dest.exists(): + logger.info("DIR %s", dest) + if not args.dry_run: + dest.mkdir(parents=True, mode=0o755, exist_ok=True) -def uninstall(args: argparse.Namespace, is_excluded: Callable[[str], bool]) -> None: - """Uninstall packages by removing symlinks.""" - dirs: List[Path] = [] - for package in args.packages: - package_dir = Path(args.repository, package) - if not package_dir.is_dir(): - logger.warning("no such package: %s; skipping", package) - continue + # Process files + for filename in files: + src_path = (root_path / filename).resolve() + dest_path = dest / filename - for root, _, files in os.walk(package_dir, followlinks=True): - root_path = Path(root) - files = [filename for filename in files if not is_excluded(filename)] - if not files: + # Skip if the file exists and we're not clobbering + if dest_path.exists() and not args.clobber: + logger.info("SKIP %s", dest_path) continue - rest = root_path.relative_to(package_dir) - dest = Path(args.target) / rest + # Does the file already exist? + if dest_path.is_file() or dest_path.is_symlink(): + logger.info("UNLINK %s", dest_path) + if not args.dry_run: + dest_path.unlink() - if rest != Path("."): - dirs.append(dest) + # Link the file + logger.info("LINK %s -> %s", src_path, dest_path) + if not args.dry_run: + try: + dest_path.symlink_to(src_path) + except OSError as e: + logger.error("failed to create symlink %s: %s", dest_path, e) - for filename in files: - dest_path = dest / filename - if not dest_path.exists(): - logger.debug("does not exist: %s", dest_path) - continue + return True - if dest_path.is_symlink(): - src_path = (root_path / filename).resolve() - try: - if dest_path.resolve() == src_path: - logger.info("UNLINK %s", dest_path) - if not args.dry_run: - dest_path.unlink() - else: - logger.info("SKIP %s (points elsewhere)", dest_path) - except (OSError, RuntimeError) as e: - logger.warning("error checking symlink %s: %s", dest_path, e) - else: - logger.info("SKIP %s (not a symlink)", dest_path) + +def uninstall_package( + package: str, args: argparse.Namespace, is_excluded: Callable[[str], bool] +) -> bool: + """Uninstalls a package by removing symlinks.""" + dirs: List[Path] = [] + + package_dir = args.repository / package + if not package_dir.is_dir(): + logger.warning("no such package: %s; skipping", package) + return False + + for root, _, files in os.walk(package_dir, followlinks=True): + root_path = Path(root) + files = [filename for filename in files if not is_excluded(filename)] + if not files: + continue + + rest = root_path.relative_to(package_dir) + dest = args.target / rest + + if rest != Path("."): + dirs.append(dest) + + for filename in files: + dest_path = dest / filename + if not dest_path.exists(): + logger.debug("does not exist: %s", dest_path) + continue + + if dest_path.is_symlink(): + src_path = (root_path / filename).resolve() + try: + if dest_path.resolve() == src_path: + logger.info("UNLINK %s", dest_path) + if not args.dry_run: + dest_path.unlink() + else: + logger.info("SKIP %s (points elsewhere)", dest_path) + except (OSError, RuntimeError) as e: + logger.warning("error checking symlink %s: %s", dest_path, e) + else: + logger.info("SKIP %s (not a symlink)", dest_path) # Delete the directories if empty. for dir_path in sorted(dirs, key=lambda p: len(str(p)), reverse=True): @@ -184,6 +196,72 @@ def uninstall(args: argparse.Namespace, is_excluded: Callable[[str], bool]) -> N dir_path.rmdir() except OSError: logger.debug("directory not empty: %s", dir_path) + pass + + return True + + +def get_packages(repo_path: Path) -> List[str]: + """Get a list of packages in the repository.""" + packages = [] + for entry in repo_path.iterdir(): + if entry.is_dir() and not entry.name.startswith("."): + packages.append(entry.name) + return sorted(packages) + + +def is_package_file(path: Path, package: str, repo_path: Path) -> bool: + """ + Check if a file is part of a package. + + This makes a few assumptions: + - The file is a symlink + - The symlink points inside the repository + - Only a single repository is used to link files from + """ + # Is the file a symlink? + if not path.is_symlink(): + return False + # Does the symlink point inside the repository? + try: + target_path = path.resolve() + except OSError: + return False + return target_path.is_relative_to(repo_path / package) + + +def is_broken_symlink(path: Path) -> bool: + """Check if a path is a broken symlink.""" + return path.is_symlink() and not path.exists() + + +def cleanup_package(package: str, args: argparse.Namespace) -> None: + """Cleanup any broken symlinks from a installed package. + + - discover the directories used in the package + - iterate the directories and remove any broken symlinks that point back to the package + """ + package_dir = args.repository / package + if not package_dir.is_dir(): + return + + # Walk the source package folder for its structure + for root, _, _ in os.walk(package_dir, followlinks=True): + root_path = Path(root) + + # Calculate the path in the target folder + rest = root_path.relative_to(package_dir) + dest = args.target / rest + + # Iterate the files in the target folder + for file in os.listdir(dest): + src_path = dest / file + if is_package_file( + src_path, package, args.repository + ) and is_broken_symlink(src_path): + logger.info("UNLINK %s", src_path) + if not args.dry_run: + src_path.unlink() def make_argparser() -> argparse.ArgumentParser: @@ -194,13 +272,15 @@ def make_argparser() -> argparse.ArgumentParser: parser.add_argument( "--target", "-t", - default=str(Path.home()), + default=Path.home(), + type=Path, help="Target directory in which to place symlinks", ) parser.add_argument( "--repository", "-r", - default=str(Path.home() / ".dotfiles"), + default=Path.home() / ".dotfiles", + type=Path, help="The location of the dotfile repository", ) parser.add_argument( @@ -223,11 +303,11 @@ def make_argparser() -> argparse.ArgumentParser: subparsers.add_parser("list", help="List packages in the repository") # Add - parser_add = subparsers.add_parser("add", help="Add a file to a package") - parser_add.add_argument("file", metavar="FILE", help="File to stow") + parser_add = subparsers.add_parser("add", help="Add a files to a package") parser_add.add_argument( - "packages", metavar="PACKAGE", nargs="+", help="Packages to install" + "package", metavar="PACKAGE", help="Package to add the files to" ) + parser_add.add_argument("files", metavar="FILE", nargs="+", help="Files to add") # Uninstall parser_uninstall = subparsers.add_parser("uninstall", help="Remove a package") @@ -241,6 +321,14 @@ def make_argparser() -> argparse.ArgumentParser: "packages", metavar="PACKAGE", nargs="+", help="Packages to install" ) + # Cleanup + parser_cleanup = subparsers.add_parser( + "cleanup", help="Cleanup broken symlinks from a package" + ) + parser_cleanup.add_argument( + "packages", metavar="PACKAGE", nargs="+", help="Packages to cleanup" + ) + return parser @@ -251,60 +339,74 @@ def main() -> None: # Configure logging log_level = logging.INFO if args.verbose or args.dry_run else logging.WARNING - logging.basicConfig( - level=log_level, - format='%(message)s', - stream=sys.stdout - ) + logging.basicConfig(level=log_level, format="%(message)s", stream=sys.stdout) # Validate repository exists - repo_path = Path(args.repository) + repo_path = args.repository if not repo_path.exists(): - logger.error("repository does not exist: %s", args.repository) + logger.error("repository (%s) does not exist", args.repository) sys.exit(1) # Validate target exists - target_path = Path(args.target) + target_path = args.target if not target_path.exists(): - logger.error("target directory does not exist: %s", args.target) + logger.error("target directory (%s) does not exist", args.target) sys.exit(1) + # Compile exclusion patterns exclude = [re.compile(fnmatch.translate(pattern)) for pattern in args.exclude] def is_excluded(filename: str) -> bool: """Check if a filename matches any exclusion pattern.""" return any(pattern.match(filename) for pattern in exclude) + # Log that we're running in dry-run mode + if args.dry_run: + logger.warning("running in dry-run mode") + + # True indicates a successful execution + command_successful = True + if args.command == "list": if not repo_path.is_dir(): logger.error("repository is not a directory: %s", args.repository) sys.exit(1) - packages = sorted([ - item.name for item in repo_path.iterdir() - if item.is_dir() and not item.name.startswith(".") - ]) - if packages: + + packages = get_packages(repo_path) + if len(packages): + print(f"Packages in repository: {repo_path}") for package in packages: - print(package) + print(f"- {package}") else: logger.info("no packages found in repository") + elif args.command == "add": - if len(args.packages) > 1: - parser.error("add only works with a single package") - file_path = Path(args.file) - # Handle both absolute and relative paths - if not file_path.is_absolute(): - file_path = Path(args.target) / file_path - if not file_path.is_file(): - parser.error(f"no such file: {args.file}") - args.file = str(file_path) - add(args) + for fname in args.files: + file_path = Path(fname) + # Handle both absolute and relative paths + if not file_path.is_absolute(): + file_path = file_path.resolve() + if not add_file_to_package(file_path, args.package, args): + command_successful = False + elif args.command == "install": - install(args, is_excluded) + for package in args.packages: + if not install_package(package, args, is_excluded): + command_successful = False + elif args.command == "uninstall": - uninstall(args, is_excluded) + for package in args.packages: + if not uninstall_package(package, args, is_excluded): + command_successful = False + + elif args.command == "cleanup": + for package in args.packages: + cleanup_package(package, args) + else: parser.print_help() + + if not command_successful: sys.exit(1)