mirror of
https://github.com/nikdoof/dotfiles.git
synced 2025-12-14 02:02:29 +00:00
[bin] Improve Stowage
Switch to using logging, and modernized Python usage.
This commit is contained in:
200
bin/bin/stowage
200
bin/bin/stowage
@@ -27,144 +27,160 @@ SOFTWARE.
|
|||||||
|
|
||||||
import argparse
|
import argparse
|
||||||
import fnmatch
|
import fnmatch
|
||||||
|
import logging
|
||||||
import os
|
import os
|
||||||
from os import path
|
|
||||||
import re
|
import re
|
||||||
import shutil
|
import shutil
|
||||||
import sys
|
import sys
|
||||||
|
from pathlib import Path
|
||||||
|
from typing import Callable, List
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
def add(args):
|
def add(args: argparse.Namespace) -> None:
|
||||||
target = path.realpath(args.target)
|
"""Add a file to a package by moving it and creating a symlink."""
|
||||||
file_path = path.realpath(args.file)
|
target = Path(args.target).resolve()
|
||||||
package = path.realpath(path.join(args.repository, args.packages[0]))
|
file_path = Path(args.file).resolve()
|
||||||
if path.commonprefix([target, file_path]) != target:
|
package = Path(args.repository, args.packages[0]).resolve()
|
||||||
print(f"error: '{args.add}' not under '{args.target}'", file=sys.stderr)
|
|
||||||
|
if not file_path.is_relative_to(target):
|
||||||
|
logger.error("'%s' not under '%s'", args.file, args.target)
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
rest = file_path[len(target) + 1 :]
|
|
||||||
dest_path = path.join(package, rest)
|
rest = file_path.relative_to(target)
|
||||||
dest = path.dirname(dest_path)
|
dest_path = package / rest
|
||||||
if not path.exists(dest):
|
dest_dir = dest_path.parent
|
||||||
if args.verbose:
|
|
||||||
print("DIR", dest)
|
if not dest_dir.exists():
|
||||||
os.makedirs(dest, mode=0o755)
|
logger.info("DIR %s", dest_dir)
|
||||||
if args.verbose:
|
if not args.dry_run:
|
||||||
print("SWAP", dest_path, file_path)
|
dest_dir.mkdir(parents=True, mode=0o755, exist_ok=True)
|
||||||
|
|
||||||
|
logger.info("SWAP %s %s", dest_path, file_path)
|
||||||
if not args.dry_run:
|
if not args.dry_run:
|
||||||
shutil.move(file_path, dest)
|
shutil.move(str(file_path), str(dest_path))
|
||||||
# TODO Should really check if the symlink fails here.
|
try:
|
||||||
os.symlink(dest_path, file_path)
|
file_path.symlink_to(dest_path)
|
||||||
|
except OSError as e:
|
||||||
|
logger.error("failed to create symlink: %s", e)
|
||||||
|
# Attempt to restore the file
|
||||||
|
shutil.move(str(dest_path), str(file_path))
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
|
||||||
def install(args, is_excluded):
|
def install(args: argparse.Namespace, is_excluded: Callable[[str], bool]) -> None:
|
||||||
|
"""Install packages by creating symlinks from repository to target."""
|
||||||
for package in args.packages:
|
for package in args.packages:
|
||||||
package_dir = path.join(args.repository, package)
|
package_dir = Path(args.repository, package)
|
||||||
if not path.isdir(package_dir):
|
if not package_dir.is_dir():
|
||||||
print(f"no such package: {package}; skipping", file=sys.stderr)
|
logger.warning("no such package: %s; skipping", package)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Walk the package
|
# Walk the package
|
||||||
for root, _, files in os.walk(package_dir, followlinks=True):
|
for root, _, files in os.walk(package_dir, followlinks=True):
|
||||||
|
root_path = Path(root)
|
||||||
files = [filename for filename in files if not is_excluded(filename)]
|
files = [filename for filename in files if not is_excluded(filename)]
|
||||||
if len(files) == 0:
|
if not files:
|
||||||
continue
|
continue
|
||||||
rest = root[len(package_dir) + 1 :]
|
|
||||||
dest = path.join(args.target, rest)
|
rest = root_path.relative_to(package_dir)
|
||||||
|
dest = Path(args.target) / rest
|
||||||
|
|
||||||
# Create the directory path
|
# Create the directory path
|
||||||
if rest != "":
|
if rest != Path("."):
|
||||||
# If a non-directory exists with the same name and clobber is enabled, get rid of it.
|
# If a non-directory exists with the same name and clobber is enabled, get rid of it.
|
||||||
if path.exists(dest) and not path.isdir(dest) and args.clobber:
|
if dest.exists() and not dest.is_dir() and args.clobber:
|
||||||
if args.verbose:
|
logger.info("UNLINK %s", dest)
|
||||||
print("UNLINK", dest)
|
|
||||||
if not args.dry_run:
|
if not args.dry_run:
|
||||||
os.unlink(dest)
|
dest.unlink()
|
||||||
|
|
||||||
# Make directory
|
# Make directory
|
||||||
if args.verbose:
|
logger.info("DIR %s", dest)
|
||||||
print("DIR", dest)
|
if not args.dry_run and not dest.exists():
|
||||||
if not args.dry_run and not path.exists(dest):
|
dest.mkdir(parents=True, mode=0o755, exist_ok=True)
|
||||||
os.makedirs(dest, mode=0o755)
|
|
||||||
|
|
||||||
# Process files
|
# Process files
|
||||||
for filename in files:
|
for filename in files:
|
||||||
src_path = path.realpath(path.join(root, filename))
|
src_path = (root_path / filename).resolve()
|
||||||
dest_path = path.join(dest, filename)
|
dest_path = dest / filename
|
||||||
|
|
||||||
# Skip if the file exists and we're not clobbering
|
# Skip if the file exists and we're not clobbering
|
||||||
if path.exists(dest_path) and not args.clobber:
|
if dest_path.exists() and not args.clobber:
|
||||||
if args.verbose:
|
logger.info("SKIP %s", dest_path)
|
||||||
print("SKIP", dest_path)
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Does the file already exist?
|
# Does the file already exist?
|
||||||
if path.isfile(dest_path):
|
if dest_path.is_file():
|
||||||
if args.verbose:
|
logger.info("UNLINK %s", dest_path)
|
||||||
print("UNLINK", dest_path)
|
|
||||||
if not args.dry_run:
|
if not args.dry_run:
|
||||||
os.unlink(dest_path)
|
dest_path.unlink()
|
||||||
|
|
||||||
# Link the file
|
# Link the file
|
||||||
if args.verbose:
|
logger.info("LINK %s %s", src_path, dest_path)
|
||||||
print("LINK", src_path, dest_path)
|
|
||||||
if not args.dry_run:
|
if not args.dry_run:
|
||||||
os.symlink(src_path, dest_path)
|
dest_path.symlink_to(src_path)
|
||||||
|
|
||||||
|
|
||||||
def uninstall(args, is_excluded):
|
def uninstall(args: argparse.Namespace, is_excluded: Callable[[str], bool]) -> None:
|
||||||
dirs = []
|
"""Uninstall packages by removing symlinks."""
|
||||||
|
dirs: List[Path] = []
|
||||||
for package in args.packages:
|
for package in args.packages:
|
||||||
package_dir = path.join(args.repository, package)
|
package_dir = Path(args.repository, package)
|
||||||
if not path.isdir(package_dir):
|
if not package_dir.is_dir():
|
||||||
print(f"no such package: {package}; skipping", file=sys.stderr)
|
logger.warning("no such package: %s; skipping", package)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
for root, _, files in os.walk(package_dir, followlinks=True):
|
for root, _, files in os.walk(package_dir, followlinks=True):
|
||||||
|
root_path = Path(root)
|
||||||
files = [filename for filename in files if not is_excluded(filename)]
|
files = [filename for filename in files if not is_excluded(filename)]
|
||||||
if len(files) == 0:
|
if not files:
|
||||||
continue
|
continue
|
||||||
rest = root[len(package_dir) + 1 :]
|
|
||||||
dest = path.join(args.target, rest)
|
rest = root_path.relative_to(package_dir)
|
||||||
if rest != "":
|
dest = Path(args.target) / rest
|
||||||
|
|
||||||
|
if rest != Path("."):
|
||||||
dirs.append(dest)
|
dirs.append(dest)
|
||||||
|
|
||||||
for filename in files:
|
for filename in files:
|
||||||
dest_path = path.join(dest, filename)
|
dest_path = dest / filename
|
||||||
if path.islink(dest_path):
|
if dest_path.is_symlink():
|
||||||
src_path = path.realpath(path.join(root, filename))
|
src_path = (root_path / filename).resolve()
|
||||||
if path.realpath(dest_path) == src_path:
|
if dest_path.resolve() == src_path:
|
||||||
if args.verbose:
|
logger.info("UNLINK %s", dest_path)
|
||||||
print("UNLINK", dest_path)
|
|
||||||
if not args.dry_run:
|
if not args.dry_run:
|
||||||
os.unlink(dest_path)
|
dest_path.unlink()
|
||||||
elif args.verbose:
|
else:
|
||||||
print("SKIP", dest_path)
|
logger.info("SKIP %s", dest_path)
|
||||||
elif args.verbose:
|
else:
|
||||||
print("SKIP", dest_path)
|
logger.info("SKIP %s", dest_path)
|
||||||
|
|
||||||
# Delete the directories if empty.
|
# Delete the directories if empty.
|
||||||
for dir_path in sorted(dirs, key=len, reverse=True):
|
for dir_path in sorted(dirs, key=lambda p: len(str(p)), reverse=True):
|
||||||
try:
|
try:
|
||||||
if args.verbose:
|
logger.info("RMDIR %s", dir_path)
|
||||||
print("RMDIR", dir_path)
|
|
||||||
if not args.dry_run:
|
if not args.dry_run:
|
||||||
os.rmdir(dir_path)
|
dir_path.rmdir()
|
||||||
except OSError:
|
except OSError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
def make_argparser():
|
def make_argparser() -> argparse.ArgumentParser:
|
||||||
|
"""Create and configure the argument parser."""
|
||||||
parser = argparse.ArgumentParser(description="A dotfile package manager.")
|
parser = argparse.ArgumentParser(description="A dotfile package manager.")
|
||||||
parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output")
|
parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output")
|
||||||
parser.add_argument("--dry-run", "-n", action="store_true", help="Dry run.")
|
parser.add_argument("--dry-run", "-n", action="store_true", help="Dry run.")
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"--target",
|
"--target",
|
||||||
"-t",
|
"-t",
|
||||||
default=path.expanduser("~"),
|
default=str(Path.home()),
|
||||||
help="Target directory in which to place symlinks",
|
help="Target directory in which to place symlinks",
|
||||||
)
|
)
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"--repository",
|
"--repository",
|
||||||
"-r",
|
"-r",
|
||||||
default=path.expanduser("~/.dotfiles"),
|
default=str(Path.home() / ".dotfiles"),
|
||||||
help="The location of the dotfile repository",
|
help="The location of the dotfile repository",
|
||||||
)
|
)
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
@@ -184,7 +200,7 @@ def make_argparser():
|
|||||||
subparsers = parser.add_subparsers(dest="command", help="sub-command help")
|
subparsers = parser.add_subparsers(dest="command", help="sub-command help")
|
||||||
|
|
||||||
# List
|
# List
|
||||||
parser_add = subparsers.add_parser("list", help="List packages in the repository")
|
subparsers.add_parser("list", help="List packages in the repository")
|
||||||
|
|
||||||
# Add
|
# Add
|
||||||
parser_add = subparsers.add_parser("add", help="Add a file to a package")
|
parser_add = subparsers.add_parser("add", help="Add a file to a package")
|
||||||
@@ -208,31 +224,45 @@ def make_argparser():
|
|||||||
return parser
|
return parser
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main() -> None:
|
||||||
|
"""Main entry point for the stowage script."""
|
||||||
parser = make_argparser()
|
parser = make_argparser()
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
if args.dry_run:
|
|
||||||
args.verbose = True
|
# Configure logging
|
||||||
|
log_level = logging.INFO if args.verbose or args.dry_run else logging.WARNING
|
||||||
|
logging.basicConfig(
|
||||||
|
level=log_level,
|
||||||
|
format='%(message)s',
|
||||||
|
stream=sys.stdout
|
||||||
|
)
|
||||||
|
|
||||||
exclude = [re.compile(fnmatch.translate(pattern)) for pattern in args.exclude]
|
exclude = [re.compile(fnmatch.translate(pattern)) for pattern in args.exclude]
|
||||||
|
|
||||||
def is_excluded(filename):
|
def is_excluded(filename: str) -> bool:
|
||||||
|
"""Check if a filename matches any exclusion pattern."""
|
||||||
return any(pattern.match(filename) for pattern in exclude)
|
return any(pattern.match(filename) for pattern in exclude)
|
||||||
|
|
||||||
if args.command == "list":
|
if args.command == "list":
|
||||||
for dir in os.listdir(args.repository):
|
repo_path = Path(args.repository)
|
||||||
if path.isdir(path.join(args.repository, dir)) and dir[0] != ".":
|
for item in sorted(repo_path.iterdir()):
|
||||||
print(dir)
|
if item.is_dir() and not item.name.startswith("."):
|
||||||
|
print(item.name)
|
||||||
elif args.command == "add":
|
elif args.command == "add":
|
||||||
if len(args.packages) > 1:
|
if len(args.packages) > 1:
|
||||||
parser.error("--add only works with a single package")
|
parser.error("add only works with a single package")
|
||||||
args.file = path.normpath(path.join(args.target, args.file))
|
file_path = Path(args.target, args.file)
|
||||||
if not path.isfile(args.file):
|
if not file_path.is_file():
|
||||||
parser.error(f"no such file: {args.file}")
|
parser.error(f"no such file: {args.file}")
|
||||||
|
args.file = str(file_path)
|
||||||
add(args)
|
add(args)
|
||||||
elif args.command == "install":
|
elif args.command == "install":
|
||||||
install(args, is_excluded)
|
install(args, is_excluded)
|
||||||
elif args.command == "uninstall":
|
elif args.command == "uninstall":
|
||||||
uninstall(args, is_excluded)
|
uninstall(args, is_excluded)
|
||||||
|
else:
|
||||||
|
parser.print_help()
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|||||||
Reference in New Issue
Block a user