Files
globaltalk-scraper/gtscraper.py
2025-04-19 18:01:16 +01:00

150 lines
4.1 KiB
Python
Executable File

#!/usr/bin/env python3
"""
GlobalTalk Scraper
A script that uses netatalk's `getzones` and `nbplkup` to discover devices on
the GlobalTalk network, and create some nice visualisations of the network.
Created for MARCHintosh 2025
"""
import argparse
import concurrent.futures
import json
import logging
import os
import re
import shutil
import subprocess
import sys
NPBLKUP_RESULTS = re.compile(r"^(.*):(.*)\s(\d*\.\d*:\d*)$")
def getzones() -> list[str]:
"""Returns a list of AppleTalk zones using `getzones`"""
result = subprocess.run(["getzones"], capture_output=True, text=True)
return [x.strip() for x in result.stdout.split("\n") if x.strip() != ""]
def nbplkup(zone: str) -> list[(str, str)]:
"""Lookup members of a zone, and return a list of name and address tuples"""
if zone not in getzones():
logging.warning("%s not currently a known zone", zone)
return []
zone_results = []
# Define the charset for the console as 'mac-roman' to avoid any
# translation issues with some device names (AsanteTalk and such)
environ = os.environ.copy()
environ["ATALK_UNIX_CHARSET"] = "mac-roman"
cmd = subprocess.run(
["nbplkup", "@{0}".format(zone)],
capture_output=True,
text=True,
encoding="mac-roman",
env=environ,
)
# Use regex to split the row nicely
for node in [
NPBLKUP_RESULTS.match(x.strip()).groups()
for x in cmd.stdout.split("\n")
if x.strip() != ""
]:
object, endpoint_type, address = node
address, socket = address.split(":")
zone_results.append(
{
"object": object.strip(),
"type": endpoint_type.strip(),
"address": address,
"socket": socket,
"zone": zone,
}
)
return zone_results
def main():
parser = argparse.ArgumentParser("gtscraper")
parser.add_argument(
"--zone",
nargs="*",
required=False,
default=None,
help="Scan selected zones",
)
parser.add_argument(
"--output",
type=argparse.FileType("w"),
default=sys.stdout,
help="Filename to write the resulting JSON to",
)
parser.add_argument("--debug", action="store_true", help="Enable debug logging")
parser.add_argument("--quiet", action="store_true")
parser.add_argument(
"--workers",
type=int,
default=10,
help="The number of concurrent zone scans to run",
)
args = parser.parse_args()
if args.debug:
level = logging.DEBUG
elif args.quiet:
level = logging.ERROR
else:
level = logging.INFO
logging.basicConfig(level=level, stream=sys.stderr)
logging.debug("Arguments: %s", args)
# Check we have the pre-reqs to actually run
for cmd in ["getzones", "nbplkup"]:
if not shutil.which(cmd):
logging.error(
"%s is missing from your PATH, is netatalk installed and working?", cmd
)
return
# Init the results dict
zone_results = {
"format": "v1",
"zones": getzones(),
"nodes": [],
}
# Subfunction to run
def lookup_zone(zone) -> list[(str, str)]:
if args.zone and zone != args.zone:
return None
logging.info("Scanning %s", zone)
return nbplkup(zone)
# Use ThreadPoolExecutor for concurrent execution
with concurrent.futures.ThreadPoolExecutor(max_workers=args.workers) as executor:
futures = {
executor.submit(lookup_zone, zone): zone for zone in zone_results["zones"]
}
for future in concurrent.futures.as_completed(futures):
result = future.result()
if result:
zone_results["nodes"].extend(result)
nodes = len(zone_results["nodes"])
zones = len(zone_results["zones"])
logging.info("{0} zones, {1} nodes".format(zones, nodes), file=sys.stderr)
# Dump out the resulting JSON to stdout
args.output.write(json.dumps(zone_results))
if __name__ == "__main__":
main()