mirror of
https://github.com/elisiariocouto/leggen.git
synced 2025-12-13 11:22:21 +00:00
Add centralized path management and sample database generator
Co-authored-by: elisiariocouto <818914+elisiariocouto@users.noreply.github.com>
This commit is contained in:
committed by
Elisiário Couto
parent
0c030efef2
commit
e9711339bd
65
leggen/commands/generate_sample_db.py
Normal file
65
leggen/commands/generate_sample_db.py
Normal file
@@ -0,0 +1,65 @@
|
||||
"""Generate sample database command."""
|
||||
|
||||
import click
|
||||
from pathlib import Path
|
||||
|
||||
from leggen.utils.paths import path_manager
|
||||
|
||||
|
||||
@click.command()
|
||||
@click.option(
|
||||
"--database",
|
||||
type=click.Path(path_type=Path),
|
||||
help="Path to database file (default: uses LEGGEN_DATABASE_PATH or ~/.config/leggen/leggen-dev.db)",
|
||||
)
|
||||
@click.option(
|
||||
"--accounts",
|
||||
type=int,
|
||||
default=3,
|
||||
help="Number of sample accounts to generate (default: 3)",
|
||||
)
|
||||
@click.option(
|
||||
"--transactions",
|
||||
type=int,
|
||||
default=50,
|
||||
help="Number of transactions per account (default: 50)",
|
||||
)
|
||||
@click.option(
|
||||
"--force",
|
||||
is_flag=True,
|
||||
help="Overwrite existing database without confirmation",
|
||||
)
|
||||
@click.pass_context
|
||||
def generate_sample_db(ctx: click.Context, database: Path, accounts: int, transactions: int, force: bool):
|
||||
"""Generate a sample database with realistic financial data for testing."""
|
||||
|
||||
# Import here to avoid circular imports
|
||||
import sys
|
||||
import subprocess
|
||||
from pathlib import Path as PathlibPath
|
||||
|
||||
# Get the script path
|
||||
script_path = PathlibPath(__file__).parent.parent.parent / "scripts" / "generate_sample_db.py"
|
||||
|
||||
# Build command arguments
|
||||
cmd = [sys.executable, str(script_path)]
|
||||
|
||||
if database:
|
||||
cmd.extend(["--database", str(database)])
|
||||
|
||||
cmd.extend(["--accounts", str(accounts)])
|
||||
cmd.extend(["--transactions", str(transactions)])
|
||||
|
||||
if force:
|
||||
cmd.append("--force")
|
||||
|
||||
# Execute the script
|
||||
try:
|
||||
subprocess.run(cmd, check=True)
|
||||
except subprocess.CalledProcessError as e:
|
||||
click.echo(f"Error generating sample database: {e}")
|
||||
ctx.exit(1)
|
||||
|
||||
|
||||
# Export the command
|
||||
generate_sample_db = generate_sample_db
|
||||
@@ -5,14 +5,13 @@ from sqlite3 import IntegrityError
|
||||
import click
|
||||
|
||||
from leggen.utils.text import success, warning
|
||||
from leggen.utils.paths import path_manager
|
||||
|
||||
|
||||
def persist_balances(ctx: click.Context, balance: dict):
|
||||
# Connect to SQLite database
|
||||
from pathlib import Path
|
||||
|
||||
db_path = Path.home() / ".config" / "leggen" / "leggen.db"
|
||||
db_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
db_path = path_manager.get_database_path()
|
||||
path_manager.ensure_database_dir_exists()
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
cursor = conn.cursor()
|
||||
|
||||
@@ -108,10 +107,8 @@ def persist_balances(ctx: click.Context, balance: dict):
|
||||
|
||||
def persist_transactions(ctx: click.Context, account: str, transactions: list) -> list:
|
||||
# Connect to SQLite database
|
||||
from pathlib import Path
|
||||
|
||||
db_path = Path.home() / ".config" / "leggen" / "leggen.db"
|
||||
db_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
db_path = path_manager.get_database_path()
|
||||
path_manager.ensure_database_dir_exists()
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
cursor = conn.cursor()
|
||||
|
||||
@@ -216,9 +213,7 @@ def get_transactions(
|
||||
search=None,
|
||||
):
|
||||
"""Get transactions from SQLite database with optional filtering"""
|
||||
from pathlib import Path
|
||||
|
||||
db_path = Path.home() / ".config" / "leggen" / "leggen.db"
|
||||
db_path = path_manager.get_database_path()
|
||||
if not db_path.exists():
|
||||
return []
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
@@ -288,9 +283,7 @@ def get_transactions(
|
||||
|
||||
def get_balances(account_id=None):
|
||||
"""Get latest balances from SQLite database"""
|
||||
from pathlib import Path
|
||||
|
||||
db_path = Path.home() / ".config" / "leggen" / "leggen.db"
|
||||
db_path = path_manager.get_database_path()
|
||||
if not db_path.exists():
|
||||
return []
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
@@ -329,9 +322,7 @@ def get_balances(account_id=None):
|
||||
|
||||
def get_account_summary(account_id):
|
||||
"""Get basic account info from transactions table (avoids GoCardless API call)"""
|
||||
from pathlib import Path
|
||||
|
||||
db_path = Path.home() / ".config" / "leggen" / "leggen.db"
|
||||
db_path = path_manager.get_database_path()
|
||||
if not db_path.exists():
|
||||
return None
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
@@ -365,9 +356,7 @@ def get_account_summary(account_id):
|
||||
|
||||
def get_transaction_count(account_id=None, **filters):
|
||||
"""Get total count of transactions matching filters"""
|
||||
from pathlib import Path
|
||||
|
||||
db_path = Path.home() / ".config" / "leggen" / "leggen.db"
|
||||
db_path = path_manager.get_database_path()
|
||||
if not db_path.exists():
|
||||
return 0
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
@@ -414,10 +403,8 @@ def get_transaction_count(account_id=None, **filters):
|
||||
|
||||
def persist_account(account_data: dict):
|
||||
"""Persist account details to SQLite database"""
|
||||
from pathlib import Path
|
||||
|
||||
db_path = Path.home() / ".config" / "leggen" / "leggen.db"
|
||||
db_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
db_path = path_manager.get_database_path()
|
||||
path_manager.ensure_database_dir_exists()
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
cursor = conn.cursor()
|
||||
|
||||
@@ -485,9 +472,7 @@ def persist_account(account_data: dict):
|
||||
|
||||
def get_accounts(account_ids=None):
|
||||
"""Get account details from SQLite database"""
|
||||
from pathlib import Path
|
||||
|
||||
db_path = Path.home() / ".config" / "leggen" / "leggen.db"
|
||||
db_path = path_manager.get_database_path()
|
||||
if not db_path.exists():
|
||||
return []
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
@@ -519,9 +504,7 @@ def get_accounts(account_ids=None):
|
||||
|
||||
def get_account(account_id: str):
|
||||
"""Get specific account details from SQLite database"""
|
||||
from pathlib import Path
|
||||
|
||||
db_path = Path.home() / ".config" / "leggen" / "leggen.db"
|
||||
db_path = path_manager.get_database_path()
|
||||
if not db_path.exists():
|
||||
return None
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
|
||||
@@ -7,6 +7,7 @@ import click
|
||||
|
||||
from leggen.utils.config import load_config
|
||||
from leggen.utils.text import error
|
||||
from leggen.utils.paths import path_manager
|
||||
|
||||
cmd_folder = os.path.abspath(os.path.join(os.path.dirname(__file__), "commands"))
|
||||
|
||||
@@ -77,7 +78,7 @@ class Group(click.Group):
|
||||
"-c",
|
||||
"--config",
|
||||
type=click.Path(dir_okay=False),
|
||||
default=Path.home() / ".config" / "leggen" / "config.toml",
|
||||
default=lambda: str(path_manager.get_config_file_path()),
|
||||
show_default=True,
|
||||
callback=load_config,
|
||||
is_eager=True,
|
||||
@@ -86,6 +87,20 @@ class Group(click.Group):
|
||||
show_envvar=True,
|
||||
help="Path to TOML configuration file",
|
||||
)
|
||||
@click.option(
|
||||
"--config-dir",
|
||||
type=click.Path(exists=False, file_okay=False, path_type=Path),
|
||||
envvar="LEGGEN_CONFIG_DIR",
|
||||
show_envvar=True,
|
||||
help="Directory containing configuration files (default: ~/.config/leggen)",
|
||||
)
|
||||
@click.option(
|
||||
"--database",
|
||||
type=click.Path(dir_okay=False, path_type=Path),
|
||||
envvar="LEGGEN_DATABASE_PATH",
|
||||
show_envvar=True,
|
||||
help="Path to SQLite database file (default: <config-dir>/leggen.db)",
|
||||
)
|
||||
@click.option(
|
||||
"--api-url",
|
||||
type=str,
|
||||
@@ -100,7 +115,7 @@ class Group(click.Group):
|
||||
)
|
||||
@click.version_option(package_name="leggen")
|
||||
@click.pass_context
|
||||
def cli(ctx: click.Context, api_url: str):
|
||||
def cli(ctx: click.Context, config_dir: Path, database: Path, api_url: str):
|
||||
"""
|
||||
Leggen: An Open Banking CLI
|
||||
"""
|
||||
@@ -109,5 +124,11 @@ def cli(ctx: click.Context, api_url: str):
|
||||
if "--help" in sys.argv[1:] or "-h" in sys.argv[1:]:
|
||||
return
|
||||
|
||||
# Set up path manager with user-provided paths
|
||||
if config_dir:
|
||||
path_manager.set_config_dir(config_dir)
|
||||
if database:
|
||||
path_manager.set_database_path(database)
|
||||
|
||||
# Store API URL in context for commands to use
|
||||
ctx.obj["api_url"] = api_url
|
||||
|
||||
67
leggen/utils/paths.py
Normal file
67
leggen/utils/paths.py
Normal file
@@ -0,0 +1,67 @@
|
||||
"""Centralized path management for Leggen."""
|
||||
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
|
||||
class PathManager:
|
||||
"""Manages configurable paths for config and database files."""
|
||||
|
||||
def __init__(self):
|
||||
self._config_dir: Optional[Path] = None
|
||||
self._database_path: Optional[Path] = None
|
||||
|
||||
def get_config_dir(self) -> Path:
|
||||
"""Get the configuration directory."""
|
||||
if self._config_dir is not None:
|
||||
return self._config_dir
|
||||
|
||||
# Check environment variable first
|
||||
config_dir = os.environ.get("LEGGEN_CONFIG_DIR")
|
||||
if config_dir:
|
||||
return Path(config_dir)
|
||||
|
||||
# Default to ~/.config/leggen
|
||||
return Path.home() / ".config" / "leggen"
|
||||
|
||||
def set_config_dir(self, path: Path) -> None:
|
||||
"""Set the configuration directory."""
|
||||
self._config_dir = Path(path)
|
||||
|
||||
def get_config_file_path(self) -> Path:
|
||||
"""Get the configuration file path."""
|
||||
return self.get_config_dir() / "config.toml"
|
||||
|
||||
def get_database_path(self) -> Path:
|
||||
"""Get the database file path."""
|
||||
if self._database_path is not None:
|
||||
return self._database_path
|
||||
|
||||
# Check environment variable first
|
||||
database_path = os.environ.get("LEGGEN_DATABASE_PATH")
|
||||
if database_path:
|
||||
return Path(database_path)
|
||||
|
||||
# Default to config_dir/leggen.db
|
||||
return self.get_config_dir() / "leggen.db"
|
||||
|
||||
def set_database_path(self, path: Path) -> None:
|
||||
"""Set the database file path."""
|
||||
self._database_path = Path(path)
|
||||
|
||||
def get_auth_file_path(self) -> Path:
|
||||
"""Get the authentication file path."""
|
||||
return self.get_config_dir() / "auth.json"
|
||||
|
||||
def ensure_config_dir_exists(self) -> None:
|
||||
"""Ensure the configuration directory exists."""
|
||||
self.get_config_dir().mkdir(parents=True, exist_ok=True)
|
||||
|
||||
def ensure_database_dir_exists(self) -> None:
|
||||
"""Ensure the database directory exists."""
|
||||
self.get_database_path().parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
|
||||
# Global instance for the application
|
||||
path_manager = PathManager()
|
||||
@@ -5,6 +5,7 @@ from pathlib import Path
|
||||
from typing import Dict, Any, Optional
|
||||
|
||||
from loguru import logger
|
||||
from leggen.utils.paths import path_manager
|
||||
|
||||
|
||||
class Config:
|
||||
@@ -23,9 +24,10 @@ class Config:
|
||||
|
||||
if config_path is None:
|
||||
config_path = os.environ.get(
|
||||
"LEGGEN_CONFIG_FILE",
|
||||
str(Path.home() / ".config" / "leggen" / "config.toml"),
|
||||
"LEGGEN_CONFIG_FILE"
|
||||
)
|
||||
if not config_path:
|
||||
config_path = str(path_manager.get_config_file_path())
|
||||
|
||||
self._config_path = config_path
|
||||
|
||||
@@ -53,9 +55,10 @@ class Config:
|
||||
|
||||
if config_path is None:
|
||||
config_path = self._config_path or os.environ.get(
|
||||
"LEGGEN_CONFIG_FILE",
|
||||
str(Path.home() / ".config" / "leggen" / "config.toml"),
|
||||
"LEGGEN_CONFIG_FILE"
|
||||
)
|
||||
if not config_path:
|
||||
config_path = str(path_manager.get_config_file_path())
|
||||
|
||||
if config_path is None:
|
||||
raise ValueError("No config path specified")
|
||||
|
||||
@@ -121,6 +121,8 @@ def create_app() -> FastAPI:
|
||||
|
||||
def main():
|
||||
import argparse
|
||||
from pathlib import Path
|
||||
from leggen.utils.paths import path_manager
|
||||
|
||||
parser = argparse.ArgumentParser(description="Start the Leggend API service")
|
||||
parser.add_argument(
|
||||
@@ -132,8 +134,24 @@ def main():
|
||||
parser.add_argument(
|
||||
"--port", type=int, default=8000, help="Port to bind to (default: 8000)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--config-dir",
|
||||
type=Path,
|
||||
help="Directory containing configuration files (default: ~/.config/leggen)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--database",
|
||||
type=Path,
|
||||
help="Path to SQLite database file (default: <config-dir>/leggen.db)",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
# Set up path manager with user-provided paths
|
||||
if args.config_dir:
|
||||
path_manager.set_config_dir(args.config_dir)
|
||||
if args.database:
|
||||
path_manager.set_database_path(args.database)
|
||||
|
||||
if args.reload:
|
||||
# Use string import for reload to work properly
|
||||
uvicorn.run(
|
||||
|
||||
@@ -6,6 +6,7 @@ from loguru import logger
|
||||
|
||||
from leggend.config import config
|
||||
import leggen.database.sqlite as sqlite_db
|
||||
from leggen.utils.paths import path_manager
|
||||
|
||||
|
||||
class DatabaseService:
|
||||
@@ -280,9 +281,7 @@ class DatabaseService:
|
||||
|
||||
async def _check_balance_timestamp_migration_needed(self) -> bool:
|
||||
"""Check if balance timestamps need migration"""
|
||||
from pathlib import Path
|
||||
|
||||
db_path = Path.home() / ".config" / "leggen" / "leggen.db"
|
||||
db_path = path_manager.get_database_path()
|
||||
if not db_path.exists():
|
||||
return False
|
||||
|
||||
@@ -310,9 +309,7 @@ class DatabaseService:
|
||||
|
||||
async def _migrate_balance_timestamps(self):
|
||||
"""Convert all Unix timestamps to datetime strings"""
|
||||
from pathlib import Path
|
||||
|
||||
db_path = Path.home() / ".config" / "leggen" / "leggen.db"
|
||||
db_path = path_manager.get_database_path()
|
||||
if not db_path.exists():
|
||||
logger.warning("Database file not found, skipping migration")
|
||||
return
|
||||
@@ -399,9 +396,7 @@ class DatabaseService:
|
||||
|
||||
async def _check_null_transaction_ids_migration_needed(self) -> bool:
|
||||
"""Check if null transaction IDs need migration"""
|
||||
from pathlib import Path
|
||||
|
||||
db_path = Path.home() / ".config" / "leggen" / "leggen.db"
|
||||
db_path = path_manager.get_database_path()
|
||||
if not db_path.exists():
|
||||
return False
|
||||
|
||||
@@ -429,9 +424,8 @@ class DatabaseService:
|
||||
async def _migrate_null_transaction_ids(self):
|
||||
"""Populate null internalTransactionId fields using transactionId from raw data"""
|
||||
import uuid
|
||||
from pathlib import Path
|
||||
|
||||
db_path = Path.home() / ".config" / "leggen" / "leggen.db"
|
||||
|
||||
db_path = path_manager.get_database_path()
|
||||
if not db_path.exists():
|
||||
logger.warning("Database file not found, skipping migration")
|
||||
return
|
||||
@@ -538,9 +532,7 @@ class DatabaseService:
|
||||
|
||||
async def _check_composite_key_migration_needed(self) -> bool:
|
||||
"""Check if composite key migration is needed"""
|
||||
from pathlib import Path
|
||||
|
||||
db_path = Path.home() / ".config" / "leggen" / "leggen.db"
|
||||
db_path = path_manager.get_database_path()
|
||||
if not db_path.exists():
|
||||
return False
|
||||
|
||||
@@ -586,9 +578,7 @@ class DatabaseService:
|
||||
|
||||
async def _migrate_to_composite_key(self):
|
||||
"""Migrate transactions table to use composite primary key (accountId, transactionId)"""
|
||||
from pathlib import Path
|
||||
|
||||
db_path = Path.home() / ".config" / "leggen" / "leggen.db"
|
||||
db_path = path_manager.get_database_path()
|
||||
if not db_path.exists():
|
||||
logger.warning("Database file not found, skipping migration")
|
||||
return
|
||||
@@ -704,10 +694,8 @@ class DatabaseService:
|
||||
try:
|
||||
import sqlite3
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
db_path = Path.home() / ".config" / "leggen" / "leggen.db"
|
||||
db_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
db_path = path_manager.get_database_path()
|
||||
path_manager.ensure_database_dir_exists()
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
cursor = conn.cursor()
|
||||
|
||||
@@ -786,10 +774,8 @@ class DatabaseService:
|
||||
import sqlite3
|
||||
import json
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
db_path = Path.home() / ".config" / "leggen" / "leggen.db"
|
||||
db_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
db_path = path_manager.get_database_path()
|
||||
path_manager.ensure_database_dir_exists()
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
cursor = conn.cursor()
|
||||
|
||||
@@ -888,10 +874,7 @@ class DatabaseService:
|
||||
) -> None:
|
||||
"""Persist account details to SQLite"""
|
||||
try:
|
||||
from pathlib import Path
|
||||
|
||||
db_path = Path.home() / ".config" / "leggen" / "leggen.db"
|
||||
db_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
path_manager.ensure_database_dir_exists()
|
||||
|
||||
# Use the sqlite_db module function
|
||||
sqlite_db.persist_account(account_data)
|
||||
|
||||
@@ -6,6 +6,7 @@ from typing import Dict, Any, List
|
||||
from loguru import logger
|
||||
|
||||
from leggend.config import config
|
||||
from leggen.utils.paths import path_manager
|
||||
|
||||
|
||||
def _log_rate_limits(response):
|
||||
@@ -39,8 +40,8 @@ class GoCardlessService:
|
||||
if self._token:
|
||||
return self._token
|
||||
|
||||
# Use ~/.config/leggen for consistency with main config
|
||||
auth_file = Path.home() / ".config" / "leggen" / "auth.json"
|
||||
# Use path manager for auth file
|
||||
auth_file = path_manager.get_auth_file_path()
|
||||
|
||||
if auth_file.exists():
|
||||
try:
|
||||
|
||||
426
scripts/generate_sample_db.py
Executable file
426
scripts/generate_sample_db.py
Executable file
@@ -0,0 +1,426 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Sample database generator for Leggen testing and development."""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import random
|
||||
import sqlite3
|
||||
import sys
|
||||
import os
|
||||
from datetime import datetime, timedelta
|
||||
from pathlib import Path
|
||||
from typing import List, Dict, Any
|
||||
|
||||
# Add the project root to the Python path
|
||||
project_root = Path(__file__).parent.parent
|
||||
sys.path.insert(0, str(project_root))
|
||||
|
||||
import click
|
||||
from leggen.utils.paths import path_manager
|
||||
|
||||
|
||||
class SampleDataGenerator:
|
||||
"""Generates realistic sample data for testing Leggen."""
|
||||
|
||||
def __init__(self, db_path: Path):
|
||||
self.db_path = db_path
|
||||
self.institutions = [
|
||||
{
|
||||
"id": "REVOLUT_REVOLT21",
|
||||
"name": "Revolut",
|
||||
"bic": "REVOLT21",
|
||||
"country": "LT",
|
||||
},
|
||||
{
|
||||
"id": "BANCOBPI_BBPIPTPL",
|
||||
"name": "Banco BPI",
|
||||
"bic": "BBPIPTPL",
|
||||
"country": "PT",
|
||||
},
|
||||
{
|
||||
"id": "MONZO_MONZGB2L",
|
||||
"name": "Monzo Bank",
|
||||
"bic": "MONZGB2L",
|
||||
"country": "GB",
|
||||
},
|
||||
{
|
||||
"id": "NUBANK_NUPBBR25",
|
||||
"name": "Nu Pagamentos",
|
||||
"bic": "NUPBBR25",
|
||||
"country": "BR",
|
||||
},
|
||||
]
|
||||
|
||||
self.transaction_types = [
|
||||
{"description": "Grocery Store", "amount_range": (-150, -20), "frequency": 0.3},
|
||||
{"description": "Coffee Shop", "amount_range": (-15, -3), "frequency": 0.2},
|
||||
{"description": "Gas Station", "amount_range": (-80, -30), "frequency": 0.1},
|
||||
{"description": "Online Shopping", "amount_range": (-200, -25), "frequency": 0.15},
|
||||
{"description": "Restaurant", "amount_range": (-60, -15), "frequency": 0.15},
|
||||
{"description": "Salary", "amount_range": (2500, 5000), "frequency": 0.02},
|
||||
{"description": "ATM Withdrawal", "amount_range": (-200, -20), "frequency": 0.05},
|
||||
{"description": "Transfer to Savings", "amount_range": (-1000, -100), "frequency": 0.03},
|
||||
]
|
||||
|
||||
def ensure_database_dir(self):
|
||||
"""Ensure database directory exists."""
|
||||
self.db_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
def create_tables(self):
|
||||
"""Create database tables."""
|
||||
conn = sqlite3.connect(str(self.db_path))
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Create accounts table
|
||||
cursor.execute("""
|
||||
CREATE TABLE IF NOT EXISTS accounts (
|
||||
id TEXT PRIMARY KEY,
|
||||
institution_id TEXT,
|
||||
status TEXT,
|
||||
iban TEXT,
|
||||
name TEXT,
|
||||
currency TEXT,
|
||||
created DATETIME,
|
||||
last_accessed DATETIME,
|
||||
last_updated DATETIME
|
||||
)
|
||||
""")
|
||||
|
||||
# Create transactions table with composite primary key
|
||||
cursor.execute("""
|
||||
CREATE TABLE IF NOT EXISTS transactions (
|
||||
accountId TEXT NOT NULL,
|
||||
transactionId TEXT NOT NULL,
|
||||
internalTransactionId TEXT,
|
||||
institutionId TEXT,
|
||||
iban TEXT,
|
||||
transactionDate DATETIME,
|
||||
description TEXT,
|
||||
transactionValue REAL,
|
||||
transactionCurrency TEXT,
|
||||
transactionStatus TEXT,
|
||||
rawTransaction JSON,
|
||||
PRIMARY KEY (accountId, transactionId)
|
||||
)
|
||||
""")
|
||||
|
||||
# Create balances table
|
||||
cursor.execute("""
|
||||
CREATE TABLE IF NOT EXISTS balances (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
account_id TEXT,
|
||||
bank TEXT,
|
||||
status TEXT,
|
||||
iban TEXT,
|
||||
amount REAL,
|
||||
currency TEXT,
|
||||
type TEXT,
|
||||
timestamp DATETIME
|
||||
)
|
||||
""")
|
||||
|
||||
# Create indexes
|
||||
cursor.execute("CREATE INDEX IF NOT EXISTS idx_transactions_internal_id ON transactions(internalTransactionId)")
|
||||
cursor.execute("CREATE INDEX IF NOT EXISTS idx_transactions_date ON transactions(transactionDate)")
|
||||
cursor.execute("CREATE INDEX IF NOT EXISTS idx_transactions_account_date ON transactions(accountId, transactionDate)")
|
||||
cursor.execute("CREATE INDEX IF NOT EXISTS idx_transactions_amount ON transactions(transactionValue)")
|
||||
cursor.execute("CREATE INDEX IF NOT EXISTS idx_balances_account_id ON balances(account_id)")
|
||||
cursor.execute("CREATE INDEX IF NOT EXISTS idx_balances_timestamp ON balances(timestamp)")
|
||||
cursor.execute("CREATE INDEX IF NOT EXISTS idx_balances_account_type_timestamp ON balances(account_id, type, timestamp)")
|
||||
cursor.execute("CREATE INDEX IF NOT EXISTS idx_accounts_institution_id ON accounts(institution_id)")
|
||||
cursor.execute("CREATE INDEX IF NOT EXISTS idx_accounts_status ON accounts(status)")
|
||||
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
def generate_iban(self, country_code: str) -> str:
|
||||
"""Generate a realistic IBAN for the given country."""
|
||||
ibans = {
|
||||
"LT": lambda: f"LT{random.randint(10, 99)}{random.randint(10000, 99999)}{random.randint(10000000, 99999999)}",
|
||||
"PT": lambda: f"PT{random.randint(10, 99)}{random.randint(1000, 9999)}{random.randint(1000, 9999)}{random.randint(10000000000, 99999999999)}",
|
||||
"GB": lambda: f"GB{random.randint(10, 99)}MONZ{random.randint(100000, 999999)}{random.randint(100000, 999999)}",
|
||||
"BR": lambda: f"BR{random.randint(10, 99)}{random.randint(10000000, 99999999)}{random.randint(1000, 9999)}{random.randint(10000000, 99999999)}",
|
||||
}
|
||||
return ibans.get(country_code, lambda: f"{country_code}{random.randint(1000000000000000, 9999999999999999)}")()
|
||||
|
||||
def generate_accounts(self, num_accounts: int = 3) -> List[Dict[str, Any]]:
|
||||
"""Generate sample accounts."""
|
||||
accounts = []
|
||||
base_date = datetime.now() - timedelta(days=90)
|
||||
|
||||
for i in range(num_accounts):
|
||||
institution = random.choice(self.institutions)
|
||||
account_id = f"account-{i+1:03d}-{random.randint(1000, 9999)}"
|
||||
|
||||
account = {
|
||||
"id": account_id,
|
||||
"institution_id": institution["id"],
|
||||
"status": "READY",
|
||||
"iban": self.generate_iban(institution["country"]),
|
||||
"name": f"Personal Account {i+1}",
|
||||
"currency": "EUR",
|
||||
"created": (base_date + timedelta(days=random.randint(0, 30))).isoformat(),
|
||||
"last_accessed": (datetime.now() - timedelta(hours=random.randint(1, 48))).isoformat(),
|
||||
"last_updated": datetime.now().isoformat(),
|
||||
}
|
||||
accounts.append(account)
|
||||
|
||||
return accounts
|
||||
|
||||
def generate_transactions(self, accounts: List[Dict[str, Any]], num_transactions_per_account: int = 50) -> List[Dict[str, Any]]:
|
||||
"""Generate sample transactions for accounts."""
|
||||
transactions = []
|
||||
base_date = datetime.now() - timedelta(days=60)
|
||||
|
||||
for account in accounts:
|
||||
account_transactions = []
|
||||
current_balance = random.uniform(500, 3000)
|
||||
|
||||
for i in range(num_transactions_per_account):
|
||||
# Choose transaction type based on frequency weights
|
||||
transaction_type = random.choices(
|
||||
self.transaction_types,
|
||||
weights=[t["frequency"] for t in self.transaction_types]
|
||||
)[0]
|
||||
|
||||
# Generate transaction amount
|
||||
min_amount, max_amount = transaction_type["amount_range"]
|
||||
amount = round(random.uniform(min_amount, max_amount), 2)
|
||||
|
||||
# Generate transaction date (more recent transactions are more likely)
|
||||
days_ago = random.choices(
|
||||
range(60),
|
||||
weights=[1.5 ** (60 - d) for d in range(60)]
|
||||
)[0]
|
||||
transaction_date = base_date + timedelta(days=days_ago, hours=random.randint(6, 22), minutes=random.randint(0, 59))
|
||||
|
||||
# Generate transaction IDs
|
||||
transaction_id = f"bank-txn-{account['id']}-{i+1:04d}"
|
||||
internal_transaction_id = f"int-txn-{random.randint(100000, 999999)}"
|
||||
|
||||
# Create realistic descriptions
|
||||
descriptions = {
|
||||
"Grocery Store": ["TESCO", "SAINSBURY'S", "LIDL", "ALDI", "WALMART", "CARREFOUR"],
|
||||
"Coffee Shop": ["STARBUCKS", "COSTA COFFEE", "PRET A MANGER", "LOCAL CAFE"],
|
||||
"Gas Station": ["BP", "SHELL", "ESSO", "GALP", "PETROBRAS"],
|
||||
"Online Shopping": ["AMAZON", "EBAY", "ZALANDO", "ASOS", "APPLE"],
|
||||
"Restaurant": ["PIZZA HUT", "MCDONALD'S", "BURGER KING", "LOCAL RESTAURANT"],
|
||||
"Salary": ["MONTHLY SALARY", "PAYROLL DEPOSIT", "SALARY PAYMENT"],
|
||||
"ATM Withdrawal": ["ATM WITHDRAWAL", "CASH WITHDRAWAL"],
|
||||
"Transfer to Savings": ["SAVINGS TRANSFER", "INVESTMENT TRANSFER"],
|
||||
}
|
||||
|
||||
specific_descriptions = descriptions.get(transaction_type["description"], [transaction_type["description"]])
|
||||
description = random.choice(specific_descriptions)
|
||||
|
||||
# Create raw transaction (simplified GoCardless format)
|
||||
raw_transaction = {
|
||||
"transactionId": transaction_id,
|
||||
"bookingDate": transaction_date.strftime("%Y-%m-%d"),
|
||||
"valueDate": transaction_date.strftime("%Y-%m-%d"),
|
||||
"transactionAmount": {
|
||||
"amount": str(amount),
|
||||
"currency": account["currency"]
|
||||
},
|
||||
"remittanceInformationUnstructured": description,
|
||||
"bankTransactionCode": "PMNT" if amount < 0 else "RCDT",
|
||||
}
|
||||
|
||||
# Determine status (most are booked, some recent ones might be pending)
|
||||
status = "pending" if days_ago < 2 and random.random() < 0.1 else "booked"
|
||||
|
||||
transaction = {
|
||||
"accountId": account["id"],
|
||||
"transactionId": transaction_id,
|
||||
"internalTransactionId": internal_transaction_id,
|
||||
"institutionId": account["institution_id"],
|
||||
"iban": account["iban"],
|
||||
"transactionDate": transaction_date.isoformat(),
|
||||
"description": description,
|
||||
"transactionValue": amount,
|
||||
"transactionCurrency": account["currency"],
|
||||
"transactionStatus": status,
|
||||
"rawTransaction": raw_transaction,
|
||||
}
|
||||
|
||||
account_transactions.append(transaction)
|
||||
current_balance += amount
|
||||
|
||||
# Sort transactions by date for realistic ordering
|
||||
account_transactions.sort(key=lambda x: x["transactionDate"])
|
||||
transactions.extend(account_transactions)
|
||||
|
||||
return transactions
|
||||
|
||||
def generate_balances(self, accounts: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
|
||||
"""Generate sample balances for accounts."""
|
||||
balances = []
|
||||
|
||||
for account in accounts:
|
||||
# Calculate balance from transactions (simplified)
|
||||
base_balance = random.uniform(500, 2000)
|
||||
|
||||
balance_types = ["interimAvailable", "closingBooked", "authorised"]
|
||||
|
||||
for balance_type in balance_types:
|
||||
# Add some variation to balance types
|
||||
variation = random.uniform(-50, 50) if balance_type != "interimAvailable" else 0
|
||||
balance_amount = base_balance + variation
|
||||
|
||||
balance = {
|
||||
"account_id": account["id"],
|
||||
"bank": account["institution_id"],
|
||||
"status": account["status"],
|
||||
"iban": account["iban"],
|
||||
"amount": round(balance_amount, 2),
|
||||
"currency": account["currency"],
|
||||
"type": balance_type,
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
}
|
||||
balances.append(balance)
|
||||
|
||||
return balances
|
||||
|
||||
def insert_data(self, accounts: List[Dict[str, Any]], transactions: List[Dict[str, Any]], balances: List[Dict[str, Any]]):
|
||||
"""Insert generated data into the database."""
|
||||
conn = sqlite3.connect(str(self.db_path))
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Insert accounts
|
||||
for account in accounts:
|
||||
cursor.execute("""
|
||||
INSERT OR REPLACE INTO accounts
|
||||
(id, institution_id, status, iban, name, currency, created, last_accessed, last_updated)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
""", (
|
||||
account["id"], account["institution_id"], account["status"], account["iban"],
|
||||
account["name"], account["currency"], account["created"],
|
||||
account["last_accessed"], account["last_updated"]
|
||||
))
|
||||
|
||||
# Insert transactions
|
||||
for transaction in transactions:
|
||||
cursor.execute("""
|
||||
INSERT OR REPLACE INTO transactions
|
||||
(accountId, transactionId, internalTransactionId, institutionId, iban,
|
||||
transactionDate, description, transactionValue, transactionCurrency,
|
||||
transactionStatus, rawTransaction)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
""", (
|
||||
transaction["accountId"], transaction["transactionId"],
|
||||
transaction["internalTransactionId"], transaction["institutionId"],
|
||||
transaction["iban"], transaction["transactionDate"], transaction["description"],
|
||||
transaction["transactionValue"], transaction["transactionCurrency"],
|
||||
transaction["transactionStatus"], json.dumps(transaction["rawTransaction"])
|
||||
))
|
||||
|
||||
# Insert balances
|
||||
for balance in balances:
|
||||
cursor.execute("""
|
||||
INSERT INTO balances
|
||||
(account_id, bank, status, iban, amount, currency, type, timestamp)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
||||
""", (
|
||||
balance["account_id"], balance["bank"], balance["status"], balance["iban"],
|
||||
balance["amount"], balance["currency"], balance["type"], balance["timestamp"]
|
||||
))
|
||||
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
def generate_sample_database(self, num_accounts: int = 3, num_transactions_per_account: int = 50):
|
||||
"""Generate complete sample database."""
|
||||
click.echo(f"🗄️ Creating sample database at: {self.db_path}")
|
||||
|
||||
self.ensure_database_dir()
|
||||
self.create_tables()
|
||||
|
||||
click.echo(f"👥 Generating {num_accounts} sample accounts...")
|
||||
accounts = self.generate_accounts(num_accounts)
|
||||
|
||||
click.echo(f"💳 Generating {num_transactions_per_account} transactions per account...")
|
||||
transactions = self.generate_transactions(accounts, num_transactions_per_account)
|
||||
|
||||
click.echo("💰 Generating account balances...")
|
||||
balances = self.generate_balances(accounts)
|
||||
|
||||
click.echo("💾 Inserting data into database...")
|
||||
self.insert_data(accounts, transactions, balances)
|
||||
|
||||
# Print summary
|
||||
click.echo("\n✅ Sample database created successfully!")
|
||||
click.echo(f"📊 Summary:")
|
||||
click.echo(f" - Accounts: {len(accounts)}")
|
||||
click.echo(f" - Transactions: {len(transactions)}")
|
||||
click.echo(f" - Balances: {len(balances)}")
|
||||
click.echo(f" - Database: {self.db_path}")
|
||||
|
||||
# Show account details
|
||||
click.echo(f"\n📋 Sample accounts:")
|
||||
for account in accounts:
|
||||
institution_name = next(inst["name"] for inst in self.institutions if inst["id"] == account["institution_id"])
|
||||
click.echo(f" - {account['id']} ({institution_name}) - {account['iban']}")
|
||||
|
||||
|
||||
@click.command()
|
||||
@click.option(
|
||||
"--database",
|
||||
type=click.Path(path_type=Path),
|
||||
help="Path to database file (default: uses LEGGEN_DATABASE_PATH or ~/.config/leggen/leggen-dev.db)",
|
||||
)
|
||||
@click.option(
|
||||
"--accounts",
|
||||
type=int,
|
||||
default=3,
|
||||
help="Number of sample accounts to generate (default: 3)",
|
||||
)
|
||||
@click.option(
|
||||
"--transactions",
|
||||
type=int,
|
||||
default=50,
|
||||
help="Number of transactions per account (default: 50)",
|
||||
)
|
||||
@click.option(
|
||||
"--force",
|
||||
is_flag=True,
|
||||
help="Overwrite existing database without confirmation",
|
||||
)
|
||||
def main(database: Path, accounts: int, transactions: int, force: bool):
|
||||
"""Generate a sample database with realistic financial data for testing Leggen."""
|
||||
|
||||
# Determine database path
|
||||
if database:
|
||||
db_path = database
|
||||
else:
|
||||
# Use development database by default to avoid overwriting production data
|
||||
import os
|
||||
env_path = os.environ.get("LEGGEN_DATABASE_PATH")
|
||||
if env_path:
|
||||
db_path = Path(env_path)
|
||||
else:
|
||||
# Default to development database in config directory
|
||||
db_path = path_manager.get_config_dir() / "leggen-dev.db"
|
||||
|
||||
# Check if database exists and ask for confirmation
|
||||
if db_path.exists() and not force:
|
||||
click.echo(f"⚠️ Database already exists: {db_path}")
|
||||
if not click.confirm("Do you want to overwrite it?"):
|
||||
click.echo("Aborted.")
|
||||
return
|
||||
|
||||
# Generate the sample database
|
||||
generator = SampleDataGenerator(db_path)
|
||||
generator.generate_sample_database(accounts, transactions)
|
||||
|
||||
# Show usage instructions
|
||||
click.echo(f"\n🚀 Usage instructions:")
|
||||
click.echo(f"To use this sample database with leggen commands:")
|
||||
click.echo(f" export LEGGEN_DATABASE_PATH={db_path}")
|
||||
click.echo(f" leggen transactions")
|
||||
click.echo(f"")
|
||||
click.echo(f"To use this sample database with leggend API:")
|
||||
click.echo(f" leggend --database {db_path}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user