Files
dotfiles/bin/.local/bin/stowage

503 lines
16 KiB
Python
Executable File

#!/usr/bin/env python3
"""
originally stowage, by Keith Gaughan <https://github.com/kgaughan/>
modified by Andrew Williams <https://github.com/nikdoof/>
A dotfile package manager
Copyright (c) Keith Gaughan, 2017.
Copyright (c) Andrew Williams, 2021-2025.
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
of the Software, and to permit persons to whom the Software is furnished to do
so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
import argparse
import fnmatch
import logging
import os
import re
import shutil
import sys
from collections.abc import Callable
from pathlib import Path
from typing import IO, List
logger = logging.getLogger(__name__)
class Colours(object):
"""ANSI color codes for terminal output"""
# Reset
RESET = "\033[0m"
# Regular colors
BLACK = "\033[30m"
RED = "\033[31m"
GREEN = "\033[32m"
YELLOW = "\033[33m"
BLUE = "\033[34m"
MAGENTA = "\033[35m"
CYAN = "\033[36m"
WHITE = "\033[37m"
# Copied from _colorize in Python stdlib
def can_colorize(*, file: IO[str] | IO[bytes] | None = None) -> bool:
def _safe_getenv(k: str, fallback: str | None = None) -> str | None:
try:
return os.environ.get(k, fallback)
except Exception:
return fallback
if file is None:
file = sys.stdout
if not sys.flags.ignore_environment:
if _safe_getenv("PYTHON_COLORS") == "0":
return False
if _safe_getenv("PYTHON_COLORS") == "1":
return True
if _safe_getenv("NO_COLOR"):
return False
if _safe_getenv("FORCE_COLOR"):
return True
if _safe_getenv("TERM") == "dumb":
return False
if not hasattr(file, "fileno"):
return False
if sys.platform == "win32":
try:
import nt
if not nt._supports_virtual_terminal():
return False
except (ImportError, AttributeError):
return False
try:
return os.isatty(file.fileno())
except OSError:
return hasattr(file, "isatty") and file.isatty()
@staticmethod
def by_name(name):
if not Colours.can_colorize():
return ""
return getattr(Colours, name.upper(), "")
# Colours to use for each action
ACTION_COLOURS = {
"LINK": "GREEN",
"UNLINK": "RED",
"DIR": "GREEN",
"RMDIR": "RED",
"SKIP": "YELLOW",
}
def print_action(action: str, msg: str) -> None:
"""Print an action message."""
colour = ""
if action in ACTION_COLOURS:
colour = Colours.by_name(ACTION_COLOURS[action])
print(f"{colour}{action}{Colours.by_name('RESET')} {msg}")
def add_file_to_package(
file_path: Path, package_name: str, args: argparse.Namespace
) -> bool:
"""Add a file to a package by moving it and creating a symlink."""
target = args.target.resolve()
package = Path(args.repository, package_name).resolve()
# Check the file is under the target directory
if not file_path.is_relative_to(target):
logger.error("'%s' not under '%s'", file_path, args.target)
return False
# Calculate the relative path in the package folder, and final destination
# e.g. /home/bin/x -> /home/.dotfiles/package/bin/x
rest = file_path.relative_to(target)
dest_path = package / rest
dest_dir = dest_path.parent
if dest_path.exists():
logger.error("file already exists in package: %s", dest_path)
return False
if not dest_dir.exists():
print_action("DIR", str(dest_dir))
if not args.dry_run:
dest_dir.mkdir(parents=True, mode=0o755, exist_ok=True)
print_action("SWAP", f"{dest_path} <-> {file_path}")
if not args.dry_run:
shutil.move(str(file_path), str(dest_path))
try:
file_path.symlink_to(dest_path)
except OSError as e:
logger.error("failed to create symlink: %s", e)
# Attempt to restore the file if symlink creation fails
try:
shutil.move(str(dest_path), str(file_path))
except Exception as restore_error:
logger.error("failed to restore file: %s", restore_error)
return False
return True
def install_package(
package: str, args: argparse.Namespace, is_excluded: Callable[[str], bool]
) -> bool:
"""Install a package by creating symlinks from repository to target."""
package_dir = args.repository / package
if not package_dir.is_dir():
logger.warning("no such package: %s; skipping", package)
return False
# Walk the package
for root, _, files in os.walk(package_dir, followlinks=True):
root_path = Path(root)
files = [filename for filename in files if not is_excluded(filename)]
if not files:
continue
rest = root_path.relative_to(package_dir)
dest = args.target / rest
# Create the directory path
if rest != Path("."):
# If a non-directory exists with the same name and clobber is enabled, get rid of it.
if dest.exists() and not dest.is_dir() and args.clobber:
print_action("UNLINK", dest)
if not args.dry_run:
dest.unlink()
# Make directory
if not dest.exists():
print_action("DIR", dest)
if not args.dry_run:
dest.mkdir(parents=True, mode=0o755, exist_ok=True)
# Process files
for filename in files:
src_path = (root_path / filename).resolve()
dest_path = dest / filename
# Skip if the file exists and we're not clobbering
if dest_path.exists() and not args.clobber:
print_action("SKIP", dest_path)
continue
# Does the file already exist?
if dest_path.is_file() or dest_path.is_symlink():
print_action("UNLINK", dest_path)
if not args.dry_run:
dest_path.unlink()
# Link the file
print_action("LINK", f"%{src_path} -> %{dest_path}")
if not args.dry_run:
try:
dest_path.symlink_to(src_path)
except OSError as e:
logger.error("failed to create symlink %s: %s", dest_path, e)
return True
def uninstall_package(
package: str, args: argparse.Namespace, is_excluded: Callable[[str], bool]
) -> bool:
"""Uninstalls a package by removing symlinks."""
dirs: List[Path] = []
package_dir = args.repository / package
if not package_dir.is_dir():
logger.warning("no such package: %s; skipping", package)
return False
for root, _, files in os.walk(package_dir, followlinks=True):
root_path = Path(root)
files = [filename for filename in files if not is_excluded(filename)]
if not files:
continue
rest = root_path.relative_to(package_dir)
dest = args.target / rest
if rest != Path("."):
dirs.append(dest)
for filename in files:
dest_path = dest / filename
if not dest_path.exists():
logger.debug("does not exist: %s", dest_path)
continue
if dest_path.is_symlink():
src_path = (root_path / filename).resolve()
try:
if dest_path.resolve() == src_path:
print_action("UNLINK", dest_path)
if not args.dry_run:
dest_path.unlink()
else:
print_action("SKIP", f"{dest_path} (points elsewhere)")
except (OSError, RuntimeError) as e:
logger.warning("error checking symlink %s: %s", dest_path, e)
else:
print_action("SKIP", f"{dest_path} (not a symlink)")
# Delete the directories if empty.
for dir_path in sorted(dirs, key=lambda p: len(str(p)), reverse=True):
if not dir_path.exists():
continue
try:
print_action("RMDIR", str(dir_path))
if not args.dry_run:
dir_path.rmdir()
except OSError:
logger.debug("directory not empty: %s", dir_path)
pass
return True
def get_packages(repo_path: Path) -> List[str]:
"""Get a list of packages in the repository."""
packages = []
for entry in repo_path.iterdir():
if entry.is_dir() and not entry.name.startswith("."):
packages.append(entry.name)
return sorted(packages)
def is_package_file(path: Path, package: str, repo_path: Path) -> bool:
"""
Check if a file is part of a package.
This makes a few assumptions:
- The file is a symlink
- The symlink points inside the repository
- Only a single repository is used to link files from
"""
# Is the file a symlink?
if not path.is_symlink():
return False
# Does the symlink point inside the repository?
try:
target_path = path.resolve()
except OSError:
return False
return target_path.is_relative_to(repo_path / package)
def is_broken_symlink(path: Path) -> bool:
"""Check if a path is a broken symlink."""
return path.is_symlink() and not path.exists()
def cleanup_package(package: str, args: argparse.Namespace) -> None:
"""Cleanup any broken symlinks from a installed package.
- discover the directories used in the package
- iterate the directories and remove any broken symlinks that point back to the package
"""
package_dir = args.repository / package
if not package_dir.is_dir():
return
# Walk the source package folder for its structure
for root, _, _ in os.walk(package_dir, followlinks=True):
root_path = Path(root)
# Calculate the path in the target folder
rest = root_path.relative_to(package_dir)
dest = args.target / rest
# If the folder doesn't exist in the target, skip it
if not dest.exists():
continue
# Iterate the files in the target folder
for file in os.listdir(dest):
src_path = dest / file
if is_package_file(
src_path, package, args.repository
) and is_broken_symlink(src_path):
print_action("UNLINK", src_path)
if not args.dry_run:
src_path.unlink()
def make_argparser() -> argparse.ArgumentParser:
"""Create and configure the argument parser."""
parser = argparse.ArgumentParser(description="A dotfile package manager.")
parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output")
parser.add_argument("--dry-run", "-n", action="store_true", help="Dry run.")
parser.add_argument(
"--target",
"-t",
default=Path.home(),
type=Path,
help="Target directory in which to place symlinks",
)
parser.add_argument(
"--repository",
"-r",
default=Path.home() / ".dotfiles",
type=Path,
help="The location of the dotfile repository",
)
parser.add_argument(
"--exclude",
"-x",
action="append",
default=[],
metavar="GLOB",
help="Glob pattern of files to exclude",
)
parser.add_argument(
"--clobber",
action="store_true",
help="Replace files even if they exist.",
)
subparsers = parser.add_subparsers(dest="command", help="sub-command help")
# List
subparsers.add_parser("list", help="List packages in the repository")
# Add
parser_add = subparsers.add_parser("add", help="Add a files to a package")
parser_add.add_argument(
"package", metavar="PACKAGE", help="Package to add the files to"
)
parser_add.add_argument("files", metavar="FILE", nargs="+", help="Files to add")
# Uninstall
parser_uninstall = subparsers.add_parser("uninstall", help="Remove a package")
parser_uninstall.add_argument(
"packages", metavar="PACKAGE", nargs="+", help="Packages to uninstall"
)
# Install
parser_install = subparsers.add_parser("install", help="Install packages")
parser_install.add_argument(
"packages", metavar="PACKAGE", nargs="+", help="Packages to install"
)
# Cleanup
parser_cleanup = subparsers.add_parser(
"cleanup", help="Cleanup broken symlinks from a package"
)
parser_cleanup.add_argument(
"packages", metavar="PACKAGE", nargs="+", help="Packages to cleanup"
)
return parser
def main() -> None:
"""Main entry point for the stowage script."""
parser = make_argparser()
args = parser.parse_args()
# Configure logging
log_level = logging.INFO if args.verbose or args.dry_run else logging.WARNING
logging.basicConfig(level=log_level, format="%(message)s", stream=sys.stdout)
# Validate repository exists
repo_path = args.repository
if not repo_path.exists():
logger.error("repository (%s) does not exist", args.repository)
sys.exit(1)
# Validate target exists
target_path = args.target
if not target_path.exists():
logger.error("target directory (%s) does not exist", args.target)
sys.exit(1)
# Compile exclusion patterns
exclude = [re.compile(fnmatch.translate(pattern)) for pattern in args.exclude]
def is_excluded(filename: str) -> bool:
"""Check if a filename matches any exclusion pattern."""
return any(pattern.match(filename) for pattern in exclude)
# Log that we're running in dry-run mode
if args.dry_run:
logger.warning("running in dry-run mode")
# True indicates a successful execution
command_successful = True
if args.command == "list":
if not repo_path.is_dir():
logger.error("repository is not a directory: %s", args.repository)
sys.exit(1)
packages = get_packages(repo_path)
if len(packages):
print(f"Packages in repository: {repo_path}")
for package in packages:
print(
f"{Colours.by_name('GREEN')}-{Colours.by_name('RESET')} {package}"
)
else:
logger.info("no packages found in repository")
elif args.command == "add":
for fname in args.files:
file_path = Path(fname)
# Handle both absolute and relative paths
if not file_path.is_absolute():
file_path = file_path.resolve()
if not add_file_to_package(file_path, args.package, args):
command_successful = False
elif args.command == "install":
for package in args.packages:
if not install_package(package, args, is_excluded):
command_successful = False
elif args.command == "uninstall":
for package in args.packages:
if not uninstall_package(package, args, is_excluded):
command_successful = False
elif args.command == "cleanup":
for package in args.packages:
cleanup_package(package, args)
else:
parser.print_help()
if not command_successful:
sys.exit(1)
if __name__ == "__main__":
main()