diff --git a/leggen/commands/generate_sample_db.py b/leggen/commands/generate_sample_db.py new file mode 100644 index 0000000..3dc9ea0 --- /dev/null +++ b/leggen/commands/generate_sample_db.py @@ -0,0 +1,65 @@ +"""Generate sample database command.""" + +import click +from pathlib import Path + +from leggen.utils.paths import path_manager + + +@click.command() +@click.option( + "--database", + type=click.Path(path_type=Path), + help="Path to database file (default: uses LEGGEN_DATABASE_PATH or ~/.config/leggen/leggen-dev.db)", +) +@click.option( + "--accounts", + type=int, + default=3, + help="Number of sample accounts to generate (default: 3)", +) +@click.option( + "--transactions", + type=int, + default=50, + help="Number of transactions per account (default: 50)", +) +@click.option( + "--force", + is_flag=True, + help="Overwrite existing database without confirmation", +) +@click.pass_context +def generate_sample_db(ctx: click.Context, database: Path, accounts: int, transactions: int, force: bool): + """Generate a sample database with realistic financial data for testing.""" + + # Import here to avoid circular imports + import sys + import subprocess + from pathlib import Path as PathlibPath + + # Get the script path + script_path = PathlibPath(__file__).parent.parent.parent / "scripts" / "generate_sample_db.py" + + # Build command arguments + cmd = [sys.executable, str(script_path)] + + if database: + cmd.extend(["--database", str(database)]) + + cmd.extend(["--accounts", str(accounts)]) + cmd.extend(["--transactions", str(transactions)]) + + if force: + cmd.append("--force") + + # Execute the script + try: + subprocess.run(cmd, check=True) + except subprocess.CalledProcessError as e: + click.echo(f"Error generating sample database: {e}") + ctx.exit(1) + + +# Export the command +generate_sample_db = generate_sample_db \ No newline at end of file diff --git a/leggen/database/sqlite.py b/leggen/database/sqlite.py index d71ad65..744302a 100644 --- a/leggen/database/sqlite.py +++ b/leggen/database/sqlite.py @@ -5,14 +5,13 @@ from sqlite3 import IntegrityError import click from leggen.utils.text import success, warning +from leggen.utils.paths import path_manager def persist_balances(ctx: click.Context, balance: dict): # Connect to SQLite database - from pathlib import Path - - db_path = Path.home() / ".config" / "leggen" / "leggen.db" - db_path.parent.mkdir(parents=True, exist_ok=True) + db_path = path_manager.get_database_path() + path_manager.ensure_database_dir_exists() conn = sqlite3.connect(str(db_path)) cursor = conn.cursor() @@ -108,10 +107,8 @@ def persist_balances(ctx: click.Context, balance: dict): def persist_transactions(ctx: click.Context, account: str, transactions: list) -> list: # Connect to SQLite database - from pathlib import Path - - db_path = Path.home() / ".config" / "leggen" / "leggen.db" - db_path.parent.mkdir(parents=True, exist_ok=True) + db_path = path_manager.get_database_path() + path_manager.ensure_database_dir_exists() conn = sqlite3.connect(str(db_path)) cursor = conn.cursor() @@ -216,9 +213,7 @@ def get_transactions( search=None, ): """Get transactions from SQLite database with optional filtering""" - from pathlib import Path - - db_path = Path.home() / ".config" / "leggen" / "leggen.db" + db_path = path_manager.get_database_path() if not db_path.exists(): return [] conn = sqlite3.connect(str(db_path)) @@ -288,9 +283,7 @@ def get_transactions( def get_balances(account_id=None): """Get latest balances from SQLite database""" - from pathlib import Path - - db_path = Path.home() / ".config" / "leggen" / "leggen.db" + db_path = path_manager.get_database_path() if not db_path.exists(): return [] conn = sqlite3.connect(str(db_path)) @@ -329,9 +322,7 @@ def get_balances(account_id=None): def get_account_summary(account_id): """Get basic account info from transactions table (avoids GoCardless API call)""" - from pathlib import Path - - db_path = Path.home() / ".config" / "leggen" / "leggen.db" + db_path = path_manager.get_database_path() if not db_path.exists(): return None conn = sqlite3.connect(str(db_path)) @@ -365,9 +356,7 @@ def get_account_summary(account_id): def get_transaction_count(account_id=None, **filters): """Get total count of transactions matching filters""" - from pathlib import Path - - db_path = Path.home() / ".config" / "leggen" / "leggen.db" + db_path = path_manager.get_database_path() if not db_path.exists(): return 0 conn = sqlite3.connect(str(db_path)) @@ -414,10 +403,8 @@ def get_transaction_count(account_id=None, **filters): def persist_account(account_data: dict): """Persist account details to SQLite database""" - from pathlib import Path - - db_path = Path.home() / ".config" / "leggen" / "leggen.db" - db_path.parent.mkdir(parents=True, exist_ok=True) + db_path = path_manager.get_database_path() + path_manager.ensure_database_dir_exists() conn = sqlite3.connect(str(db_path)) cursor = conn.cursor() @@ -485,9 +472,7 @@ def persist_account(account_data: dict): def get_accounts(account_ids=None): """Get account details from SQLite database""" - from pathlib import Path - - db_path = Path.home() / ".config" / "leggen" / "leggen.db" + db_path = path_manager.get_database_path() if not db_path.exists(): return [] conn = sqlite3.connect(str(db_path)) @@ -519,9 +504,7 @@ def get_accounts(account_ids=None): def get_account(account_id: str): """Get specific account details from SQLite database""" - from pathlib import Path - - db_path = Path.home() / ".config" / "leggen" / "leggen.db" + db_path = path_manager.get_database_path() if not db_path.exists(): return None conn = sqlite3.connect(str(db_path)) diff --git a/leggen/main.py b/leggen/main.py index de195e5..9bfa692 100644 --- a/leggen/main.py +++ b/leggen/main.py @@ -7,6 +7,7 @@ import click from leggen.utils.config import load_config from leggen.utils.text import error +from leggen.utils.paths import path_manager cmd_folder = os.path.abspath(os.path.join(os.path.dirname(__file__), "commands")) @@ -77,7 +78,7 @@ class Group(click.Group): "-c", "--config", type=click.Path(dir_okay=False), - default=Path.home() / ".config" / "leggen" / "config.toml", + default=lambda: str(path_manager.get_config_file_path()), show_default=True, callback=load_config, is_eager=True, @@ -86,6 +87,20 @@ class Group(click.Group): show_envvar=True, help="Path to TOML configuration file", ) +@click.option( + "--config-dir", + type=click.Path(exists=False, file_okay=False, path_type=Path), + envvar="LEGGEN_CONFIG_DIR", + show_envvar=True, + help="Directory containing configuration files (default: ~/.config/leggen)", +) +@click.option( + "--database", + type=click.Path(dir_okay=False, path_type=Path), + envvar="LEGGEN_DATABASE_PATH", + show_envvar=True, + help="Path to SQLite database file (default: /leggen.db)", +) @click.option( "--api-url", type=str, @@ -100,7 +115,7 @@ class Group(click.Group): ) @click.version_option(package_name="leggen") @click.pass_context -def cli(ctx: click.Context, api_url: str): +def cli(ctx: click.Context, config_dir: Path, database: Path, api_url: str): """ Leggen: An Open Banking CLI """ @@ -109,5 +124,11 @@ def cli(ctx: click.Context, api_url: str): if "--help" in sys.argv[1:] or "-h" in sys.argv[1:]: return + # Set up path manager with user-provided paths + if config_dir: + path_manager.set_config_dir(config_dir) + if database: + path_manager.set_database_path(database) + # Store API URL in context for commands to use ctx.obj["api_url"] = api_url diff --git a/leggen/utils/paths.py b/leggen/utils/paths.py new file mode 100644 index 0000000..40241eb --- /dev/null +++ b/leggen/utils/paths.py @@ -0,0 +1,67 @@ +"""Centralized path management for Leggen.""" + +import os +from pathlib import Path +from typing import Optional + + +class PathManager: + """Manages configurable paths for config and database files.""" + + def __init__(self): + self._config_dir: Optional[Path] = None + self._database_path: Optional[Path] = None + + def get_config_dir(self) -> Path: + """Get the configuration directory.""" + if self._config_dir is not None: + return self._config_dir + + # Check environment variable first + config_dir = os.environ.get("LEGGEN_CONFIG_DIR") + if config_dir: + return Path(config_dir) + + # Default to ~/.config/leggen + return Path.home() / ".config" / "leggen" + + def set_config_dir(self, path: Path) -> None: + """Set the configuration directory.""" + self._config_dir = Path(path) + + def get_config_file_path(self) -> Path: + """Get the configuration file path.""" + return self.get_config_dir() / "config.toml" + + def get_database_path(self) -> Path: + """Get the database file path.""" + if self._database_path is not None: + return self._database_path + + # Check environment variable first + database_path = os.environ.get("LEGGEN_DATABASE_PATH") + if database_path: + return Path(database_path) + + # Default to config_dir/leggen.db + return self.get_config_dir() / "leggen.db" + + def set_database_path(self, path: Path) -> None: + """Set the database file path.""" + self._database_path = Path(path) + + def get_auth_file_path(self) -> Path: + """Get the authentication file path.""" + return self.get_config_dir() / "auth.json" + + def ensure_config_dir_exists(self) -> None: + """Ensure the configuration directory exists.""" + self.get_config_dir().mkdir(parents=True, exist_ok=True) + + def ensure_database_dir_exists(self) -> None: + """Ensure the database directory exists.""" + self.get_database_path().parent.mkdir(parents=True, exist_ok=True) + + +# Global instance for the application +path_manager = PathManager() \ No newline at end of file diff --git a/leggend/config.py b/leggend/config.py index 336434e..662c0ef 100644 --- a/leggend/config.py +++ b/leggend/config.py @@ -5,6 +5,7 @@ from pathlib import Path from typing import Dict, Any, Optional from loguru import logger +from leggen.utils.paths import path_manager class Config: @@ -23,9 +24,10 @@ class Config: if config_path is None: config_path = os.environ.get( - "LEGGEN_CONFIG_FILE", - str(Path.home() / ".config" / "leggen" / "config.toml"), + "LEGGEN_CONFIG_FILE" ) + if not config_path: + config_path = str(path_manager.get_config_file_path()) self._config_path = config_path @@ -53,9 +55,10 @@ class Config: if config_path is None: config_path = self._config_path or os.environ.get( - "LEGGEN_CONFIG_FILE", - str(Path.home() / ".config" / "leggen" / "config.toml"), + "LEGGEN_CONFIG_FILE" ) + if not config_path: + config_path = str(path_manager.get_config_file_path()) if config_path is None: raise ValueError("No config path specified") diff --git a/leggend/main.py b/leggend/main.py index 30b2a2c..33c3544 100644 --- a/leggend/main.py +++ b/leggend/main.py @@ -121,6 +121,8 @@ def create_app() -> FastAPI: def main(): import argparse + from pathlib import Path + from leggen.utils.paths import path_manager parser = argparse.ArgumentParser(description="Start the Leggend API service") parser.add_argument( @@ -132,8 +134,24 @@ def main(): parser.add_argument( "--port", type=int, default=8000, help="Port to bind to (default: 8000)" ) + parser.add_argument( + "--config-dir", + type=Path, + help="Directory containing configuration files (default: ~/.config/leggen)", + ) + parser.add_argument( + "--database", + type=Path, + help="Path to SQLite database file (default: /leggen.db)", + ) args = parser.parse_args() + # Set up path manager with user-provided paths + if args.config_dir: + path_manager.set_config_dir(args.config_dir) + if args.database: + path_manager.set_database_path(args.database) + if args.reload: # Use string import for reload to work properly uvicorn.run( diff --git a/leggend/services/database_service.py b/leggend/services/database_service.py index df0bd17..4edf192 100644 --- a/leggend/services/database_service.py +++ b/leggend/services/database_service.py @@ -6,6 +6,7 @@ from loguru import logger from leggend.config import config import leggen.database.sqlite as sqlite_db +from leggen.utils.paths import path_manager class DatabaseService: @@ -280,9 +281,7 @@ class DatabaseService: async def _check_balance_timestamp_migration_needed(self) -> bool: """Check if balance timestamps need migration""" - from pathlib import Path - - db_path = Path.home() / ".config" / "leggen" / "leggen.db" + db_path = path_manager.get_database_path() if not db_path.exists(): return False @@ -310,9 +309,7 @@ class DatabaseService: async def _migrate_balance_timestamps(self): """Convert all Unix timestamps to datetime strings""" - from pathlib import Path - - db_path = Path.home() / ".config" / "leggen" / "leggen.db" + db_path = path_manager.get_database_path() if not db_path.exists(): logger.warning("Database file not found, skipping migration") return @@ -399,9 +396,7 @@ class DatabaseService: async def _check_null_transaction_ids_migration_needed(self) -> bool: """Check if null transaction IDs need migration""" - from pathlib import Path - - db_path = Path.home() / ".config" / "leggen" / "leggen.db" + db_path = path_manager.get_database_path() if not db_path.exists(): return False @@ -429,9 +424,8 @@ class DatabaseService: async def _migrate_null_transaction_ids(self): """Populate null internalTransactionId fields using transactionId from raw data""" import uuid - from pathlib import Path - - db_path = Path.home() / ".config" / "leggen" / "leggen.db" + + db_path = path_manager.get_database_path() if not db_path.exists(): logger.warning("Database file not found, skipping migration") return @@ -538,9 +532,7 @@ class DatabaseService: async def _check_composite_key_migration_needed(self) -> bool: """Check if composite key migration is needed""" - from pathlib import Path - - db_path = Path.home() / ".config" / "leggen" / "leggen.db" + db_path = path_manager.get_database_path() if not db_path.exists(): return False @@ -586,9 +578,7 @@ class DatabaseService: async def _migrate_to_composite_key(self): """Migrate transactions table to use composite primary key (accountId, transactionId)""" - from pathlib import Path - - db_path = Path.home() / ".config" / "leggen" / "leggen.db" + db_path = path_manager.get_database_path() if not db_path.exists(): logger.warning("Database file not found, skipping migration") return @@ -704,10 +694,8 @@ class DatabaseService: try: import sqlite3 - from pathlib import Path - - db_path = Path.home() / ".config" / "leggen" / "leggen.db" - db_path.parent.mkdir(parents=True, exist_ok=True) + db_path = path_manager.get_database_path() + path_manager.ensure_database_dir_exists() conn = sqlite3.connect(str(db_path)) cursor = conn.cursor() @@ -786,10 +774,8 @@ class DatabaseService: import sqlite3 import json - from pathlib import Path - - db_path = Path.home() / ".config" / "leggen" / "leggen.db" - db_path.parent.mkdir(parents=True, exist_ok=True) + db_path = path_manager.get_database_path() + path_manager.ensure_database_dir_exists() conn = sqlite3.connect(str(db_path)) cursor = conn.cursor() @@ -888,10 +874,7 @@ class DatabaseService: ) -> None: """Persist account details to SQLite""" try: - from pathlib import Path - - db_path = Path.home() / ".config" / "leggen" / "leggen.db" - db_path.parent.mkdir(parents=True, exist_ok=True) + path_manager.ensure_database_dir_exists() # Use the sqlite_db module function sqlite_db.persist_account(account_data) diff --git a/leggend/services/gocardless_service.py b/leggend/services/gocardless_service.py index 4382f61..58afecb 100644 --- a/leggend/services/gocardless_service.py +++ b/leggend/services/gocardless_service.py @@ -6,6 +6,7 @@ from typing import Dict, Any, List from loguru import logger from leggend.config import config +from leggen.utils.paths import path_manager def _log_rate_limits(response): @@ -39,8 +40,8 @@ class GoCardlessService: if self._token: return self._token - # Use ~/.config/leggen for consistency with main config - auth_file = Path.home() / ".config" / "leggen" / "auth.json" + # Use path manager for auth file + auth_file = path_manager.get_auth_file_path() if auth_file.exists(): try: diff --git a/scripts/generate_sample_db.py b/scripts/generate_sample_db.py new file mode 100755 index 0000000..51bfba3 --- /dev/null +++ b/scripts/generate_sample_db.py @@ -0,0 +1,426 @@ +#!/usr/bin/env python3 +"""Sample database generator for Leggen testing and development.""" + +import argparse +import json +import random +import sqlite3 +import sys +import os +from datetime import datetime, timedelta +from pathlib import Path +from typing import List, Dict, Any + +# Add the project root to the Python path +project_root = Path(__file__).parent.parent +sys.path.insert(0, str(project_root)) + +import click +from leggen.utils.paths import path_manager + + +class SampleDataGenerator: + """Generates realistic sample data for testing Leggen.""" + + def __init__(self, db_path: Path): + self.db_path = db_path + self.institutions = [ + { + "id": "REVOLUT_REVOLT21", + "name": "Revolut", + "bic": "REVOLT21", + "country": "LT", + }, + { + "id": "BANCOBPI_BBPIPTPL", + "name": "Banco BPI", + "bic": "BBPIPTPL", + "country": "PT", + }, + { + "id": "MONZO_MONZGB2L", + "name": "Monzo Bank", + "bic": "MONZGB2L", + "country": "GB", + }, + { + "id": "NUBANK_NUPBBR25", + "name": "Nu Pagamentos", + "bic": "NUPBBR25", + "country": "BR", + }, + ] + + self.transaction_types = [ + {"description": "Grocery Store", "amount_range": (-150, -20), "frequency": 0.3}, + {"description": "Coffee Shop", "amount_range": (-15, -3), "frequency": 0.2}, + {"description": "Gas Station", "amount_range": (-80, -30), "frequency": 0.1}, + {"description": "Online Shopping", "amount_range": (-200, -25), "frequency": 0.15}, + {"description": "Restaurant", "amount_range": (-60, -15), "frequency": 0.15}, + {"description": "Salary", "amount_range": (2500, 5000), "frequency": 0.02}, + {"description": "ATM Withdrawal", "amount_range": (-200, -20), "frequency": 0.05}, + {"description": "Transfer to Savings", "amount_range": (-1000, -100), "frequency": 0.03}, + ] + + def ensure_database_dir(self): + """Ensure database directory exists.""" + self.db_path.parent.mkdir(parents=True, exist_ok=True) + + def create_tables(self): + """Create database tables.""" + conn = sqlite3.connect(str(self.db_path)) + cursor = conn.cursor() + + # Create accounts table + cursor.execute(""" + CREATE TABLE IF NOT EXISTS accounts ( + id TEXT PRIMARY KEY, + institution_id TEXT, + status TEXT, + iban TEXT, + name TEXT, + currency TEXT, + created DATETIME, + last_accessed DATETIME, + last_updated DATETIME + ) + """) + + # Create transactions table with composite primary key + cursor.execute(""" + CREATE TABLE IF NOT EXISTS transactions ( + accountId TEXT NOT NULL, + transactionId TEXT NOT NULL, + internalTransactionId TEXT, + institutionId TEXT, + iban TEXT, + transactionDate DATETIME, + description TEXT, + transactionValue REAL, + transactionCurrency TEXT, + transactionStatus TEXT, + rawTransaction JSON, + PRIMARY KEY (accountId, transactionId) + ) + """) + + # Create balances table + cursor.execute(""" + CREATE TABLE IF NOT EXISTS balances ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + account_id TEXT, + bank TEXT, + status TEXT, + iban TEXT, + amount REAL, + currency TEXT, + type TEXT, + timestamp DATETIME + ) + """) + + # Create indexes + cursor.execute("CREATE INDEX IF NOT EXISTS idx_transactions_internal_id ON transactions(internalTransactionId)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_transactions_date ON transactions(transactionDate)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_transactions_account_date ON transactions(accountId, transactionDate)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_transactions_amount ON transactions(transactionValue)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_balances_account_id ON balances(account_id)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_balances_timestamp ON balances(timestamp)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_balances_account_type_timestamp ON balances(account_id, type, timestamp)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_accounts_institution_id ON accounts(institution_id)") + cursor.execute("CREATE INDEX IF NOT EXISTS idx_accounts_status ON accounts(status)") + + conn.commit() + conn.close() + + def generate_iban(self, country_code: str) -> str: + """Generate a realistic IBAN for the given country.""" + ibans = { + "LT": lambda: f"LT{random.randint(10, 99)}{random.randint(10000, 99999)}{random.randint(10000000, 99999999)}", + "PT": lambda: f"PT{random.randint(10, 99)}{random.randint(1000, 9999)}{random.randint(1000, 9999)}{random.randint(10000000000, 99999999999)}", + "GB": lambda: f"GB{random.randint(10, 99)}MONZ{random.randint(100000, 999999)}{random.randint(100000, 999999)}", + "BR": lambda: f"BR{random.randint(10, 99)}{random.randint(10000000, 99999999)}{random.randint(1000, 9999)}{random.randint(10000000, 99999999)}", + } + return ibans.get(country_code, lambda: f"{country_code}{random.randint(1000000000000000, 9999999999999999)}")() + + def generate_accounts(self, num_accounts: int = 3) -> List[Dict[str, Any]]: + """Generate sample accounts.""" + accounts = [] + base_date = datetime.now() - timedelta(days=90) + + for i in range(num_accounts): + institution = random.choice(self.institutions) + account_id = f"account-{i+1:03d}-{random.randint(1000, 9999)}" + + account = { + "id": account_id, + "institution_id": institution["id"], + "status": "READY", + "iban": self.generate_iban(institution["country"]), + "name": f"Personal Account {i+1}", + "currency": "EUR", + "created": (base_date + timedelta(days=random.randint(0, 30))).isoformat(), + "last_accessed": (datetime.now() - timedelta(hours=random.randint(1, 48))).isoformat(), + "last_updated": datetime.now().isoformat(), + } + accounts.append(account) + + return accounts + + def generate_transactions(self, accounts: List[Dict[str, Any]], num_transactions_per_account: int = 50) -> List[Dict[str, Any]]: + """Generate sample transactions for accounts.""" + transactions = [] + base_date = datetime.now() - timedelta(days=60) + + for account in accounts: + account_transactions = [] + current_balance = random.uniform(500, 3000) + + for i in range(num_transactions_per_account): + # Choose transaction type based on frequency weights + transaction_type = random.choices( + self.transaction_types, + weights=[t["frequency"] for t in self.transaction_types] + )[0] + + # Generate transaction amount + min_amount, max_amount = transaction_type["amount_range"] + amount = round(random.uniform(min_amount, max_amount), 2) + + # Generate transaction date (more recent transactions are more likely) + days_ago = random.choices( + range(60), + weights=[1.5 ** (60 - d) for d in range(60)] + )[0] + transaction_date = base_date + timedelta(days=days_ago, hours=random.randint(6, 22), minutes=random.randint(0, 59)) + + # Generate transaction IDs + transaction_id = f"bank-txn-{account['id']}-{i+1:04d}" + internal_transaction_id = f"int-txn-{random.randint(100000, 999999)}" + + # Create realistic descriptions + descriptions = { + "Grocery Store": ["TESCO", "SAINSBURY'S", "LIDL", "ALDI", "WALMART", "CARREFOUR"], + "Coffee Shop": ["STARBUCKS", "COSTA COFFEE", "PRET A MANGER", "LOCAL CAFE"], + "Gas Station": ["BP", "SHELL", "ESSO", "GALP", "PETROBRAS"], + "Online Shopping": ["AMAZON", "EBAY", "ZALANDO", "ASOS", "APPLE"], + "Restaurant": ["PIZZA HUT", "MCDONALD'S", "BURGER KING", "LOCAL RESTAURANT"], + "Salary": ["MONTHLY SALARY", "PAYROLL DEPOSIT", "SALARY PAYMENT"], + "ATM Withdrawal": ["ATM WITHDRAWAL", "CASH WITHDRAWAL"], + "Transfer to Savings": ["SAVINGS TRANSFER", "INVESTMENT TRANSFER"], + } + + specific_descriptions = descriptions.get(transaction_type["description"], [transaction_type["description"]]) + description = random.choice(specific_descriptions) + + # Create raw transaction (simplified GoCardless format) + raw_transaction = { + "transactionId": transaction_id, + "bookingDate": transaction_date.strftime("%Y-%m-%d"), + "valueDate": transaction_date.strftime("%Y-%m-%d"), + "transactionAmount": { + "amount": str(amount), + "currency": account["currency"] + }, + "remittanceInformationUnstructured": description, + "bankTransactionCode": "PMNT" if amount < 0 else "RCDT", + } + + # Determine status (most are booked, some recent ones might be pending) + status = "pending" if days_ago < 2 and random.random() < 0.1 else "booked" + + transaction = { + "accountId": account["id"], + "transactionId": transaction_id, + "internalTransactionId": internal_transaction_id, + "institutionId": account["institution_id"], + "iban": account["iban"], + "transactionDate": transaction_date.isoformat(), + "description": description, + "transactionValue": amount, + "transactionCurrency": account["currency"], + "transactionStatus": status, + "rawTransaction": raw_transaction, + } + + account_transactions.append(transaction) + current_balance += amount + + # Sort transactions by date for realistic ordering + account_transactions.sort(key=lambda x: x["transactionDate"]) + transactions.extend(account_transactions) + + return transactions + + def generate_balances(self, accounts: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """Generate sample balances for accounts.""" + balances = [] + + for account in accounts: + # Calculate balance from transactions (simplified) + base_balance = random.uniform(500, 2000) + + balance_types = ["interimAvailable", "closingBooked", "authorised"] + + for balance_type in balance_types: + # Add some variation to balance types + variation = random.uniform(-50, 50) if balance_type != "interimAvailable" else 0 + balance_amount = base_balance + variation + + balance = { + "account_id": account["id"], + "bank": account["institution_id"], + "status": account["status"], + "iban": account["iban"], + "amount": round(balance_amount, 2), + "currency": account["currency"], + "type": balance_type, + "timestamp": datetime.now().isoformat(), + } + balances.append(balance) + + return balances + + def insert_data(self, accounts: List[Dict[str, Any]], transactions: List[Dict[str, Any]], balances: List[Dict[str, Any]]): + """Insert generated data into the database.""" + conn = sqlite3.connect(str(self.db_path)) + cursor = conn.cursor() + + # Insert accounts + for account in accounts: + cursor.execute(""" + INSERT OR REPLACE INTO accounts + (id, institution_id, status, iban, name, currency, created, last_accessed, last_updated) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + """, ( + account["id"], account["institution_id"], account["status"], account["iban"], + account["name"], account["currency"], account["created"], + account["last_accessed"], account["last_updated"] + )) + + # Insert transactions + for transaction in transactions: + cursor.execute(""" + INSERT OR REPLACE INTO transactions + (accountId, transactionId, internalTransactionId, institutionId, iban, + transactionDate, description, transactionValue, transactionCurrency, + transactionStatus, rawTransaction) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, ( + transaction["accountId"], transaction["transactionId"], + transaction["internalTransactionId"], transaction["institutionId"], + transaction["iban"], transaction["transactionDate"], transaction["description"], + transaction["transactionValue"], transaction["transactionCurrency"], + transaction["transactionStatus"], json.dumps(transaction["rawTransaction"]) + )) + + # Insert balances + for balance in balances: + cursor.execute(""" + INSERT INTO balances + (account_id, bank, status, iban, amount, currency, type, timestamp) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + """, ( + balance["account_id"], balance["bank"], balance["status"], balance["iban"], + balance["amount"], balance["currency"], balance["type"], balance["timestamp"] + )) + + conn.commit() + conn.close() + + def generate_sample_database(self, num_accounts: int = 3, num_transactions_per_account: int = 50): + """Generate complete sample database.""" + click.echo(f"šŸ—„ļø Creating sample database at: {self.db_path}") + + self.ensure_database_dir() + self.create_tables() + + click.echo(f"šŸ‘„ Generating {num_accounts} sample accounts...") + accounts = self.generate_accounts(num_accounts) + + click.echo(f"šŸ’³ Generating {num_transactions_per_account} transactions per account...") + transactions = self.generate_transactions(accounts, num_transactions_per_account) + + click.echo("šŸ’° Generating account balances...") + balances = self.generate_balances(accounts) + + click.echo("šŸ’¾ Inserting data into database...") + self.insert_data(accounts, transactions, balances) + + # Print summary + click.echo("\nāœ… Sample database created successfully!") + click.echo(f"šŸ“Š Summary:") + click.echo(f" - Accounts: {len(accounts)}") + click.echo(f" - Transactions: {len(transactions)}") + click.echo(f" - Balances: {len(balances)}") + click.echo(f" - Database: {self.db_path}") + + # Show account details + click.echo(f"\nšŸ“‹ Sample accounts:") + for account in accounts: + institution_name = next(inst["name"] for inst in self.institutions if inst["id"] == account["institution_id"]) + click.echo(f" - {account['id']} ({institution_name}) - {account['iban']}") + + +@click.command() +@click.option( + "--database", + type=click.Path(path_type=Path), + help="Path to database file (default: uses LEGGEN_DATABASE_PATH or ~/.config/leggen/leggen-dev.db)", +) +@click.option( + "--accounts", + type=int, + default=3, + help="Number of sample accounts to generate (default: 3)", +) +@click.option( + "--transactions", + type=int, + default=50, + help="Number of transactions per account (default: 50)", +) +@click.option( + "--force", + is_flag=True, + help="Overwrite existing database without confirmation", +) +def main(database: Path, accounts: int, transactions: int, force: bool): + """Generate a sample database with realistic financial data for testing Leggen.""" + + # Determine database path + if database: + db_path = database + else: + # Use development database by default to avoid overwriting production data + import os + env_path = os.environ.get("LEGGEN_DATABASE_PATH") + if env_path: + db_path = Path(env_path) + else: + # Default to development database in config directory + db_path = path_manager.get_config_dir() / "leggen-dev.db" + + # Check if database exists and ask for confirmation + if db_path.exists() and not force: + click.echo(f"āš ļø Database already exists: {db_path}") + if not click.confirm("Do you want to overwrite it?"): + click.echo("Aborted.") + return + + # Generate the sample database + generator = SampleDataGenerator(db_path) + generator.generate_sample_database(accounts, transactions) + + # Show usage instructions + click.echo(f"\nšŸš€ Usage instructions:") + click.echo(f"To use this sample database with leggen commands:") + click.echo(f" export LEGGEN_DATABASE_PATH={db_path}") + click.echo(f" leggen transactions") + click.echo(f"") + click.echo(f"To use this sample database with leggend API:") + click.echo(f" leggend --database {db_path}") + + +if __name__ == "__main__": + main() \ No newline at end of file