refactor: Consolidate database layer and eliminate wrapper complexity.

- Merge leggen/database/sqlite.py functionality directly into DatabaseService
- Extract transaction processing logic to separate TransactionProcessor class
- Remove leggen/utils/database.py and leggen/database/ directory entirely
- Update all tests to use new consolidated structure
- Reduce codebase by ~300 lines while maintaining full functionality
- Improve separation of concerns: data processing vs persistence vs CLI

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Elisiário Couto
2025-09-14 20:56:17 +01:00
committed by Elisiário Couto
parent d09cf6d04c
commit 5ae3a51d81
7 changed files with 589 additions and 1266 deletions

View File

@@ -1,18 +1,21 @@
from datetime import datetime
from datetime import datetime, timedelta
from typing import List, Dict, Any, Optional
import sqlite3
import json
from collections import defaultdict
from loguru import logger
from leggen.utils.config import config
import leggen.database.sqlite as sqlite_db
from leggen.utils.paths import path_manager
from leggen.services.transaction_processor import TransactionProcessor
class DatabaseService:
def __init__(self):
self.db_config = config.database_config
self.sqlite_enabled = self.db_config.get("sqlite", True)
self.transaction_processor = TransactionProcessor()
async def persist_balance(
self, account_id: str, balance_data: Dict[str, Any]
@@ -41,79 +44,9 @@ class DatabaseService:
transaction_data: Dict[str, Any],
) -> List[Dict[str, Any]]:
"""Process raw transaction data into standardized format"""
transactions = []
# Process booked transactions
for transaction in transaction_data.get("transactions", {}).get("booked", []):
processed = self._process_single_transaction(
account_id, account_info, transaction, "booked"
)
transactions.append(processed)
# Process pending transactions
for transaction in transaction_data.get("transactions", {}).get("pending", []):
processed = self._process_single_transaction(
account_id, account_info, transaction, "pending"
)
transactions.append(processed)
return transactions
def _process_single_transaction(
self,
account_id: str,
account_info: Dict[str, Any],
transaction: Dict[str, Any],
status: str,
) -> Dict[str, Any]:
"""Process a single transaction into standardized format"""
# Extract dates
booked_date = transaction.get("bookingDateTime") or transaction.get(
"bookingDate"
return self.transaction_processor.process_transactions(
account_id, account_info, transaction_data
)
value_date = transaction.get("valueDateTime") or transaction.get("valueDate")
if booked_date and value_date:
min_date = min(
datetime.fromisoformat(booked_date), datetime.fromisoformat(value_date)
)
else:
date_str = booked_date or value_date
if not date_str:
raise ValueError("No valid date found in transaction")
min_date = datetime.fromisoformat(date_str)
# Extract amount and currency
transaction_amount = transaction.get("transactionAmount", {})
amount = float(transaction_amount.get("amount", 0))
currency = transaction_amount.get("currency", "")
# Extract description
description = transaction.get(
"remittanceInformationUnstructured",
",".join(transaction.get("remittanceInformationUnstructuredArray", [])),
)
# Extract transaction IDs - transactionId is now primary, internalTransactionId is reference
transaction_id = transaction.get("transactionId")
internal_transaction_id = transaction.get("internalTransactionId")
if not transaction_id:
raise ValueError("Transaction missing required transactionId field")
return {
"accountId": account_id,
"transactionId": transaction_id,
"internalTransactionId": internal_transaction_id,
"institutionId": account_info["institution_id"],
"iban": account_info.get("iban", "N/A"),
"transactionDate": min_date,
"description": description,
"transactionValue": amount,
"transactionCurrency": currency,
"transactionStatus": status,
"rawTransaction": transaction,
}
async def get_transactions_from_db(
self,
@@ -132,7 +65,7 @@ class DatabaseService:
return []
try:
transactions = sqlite_db.get_transactions(
transactions = self._get_transactions(
account_id=account_id,
limit=limit, # Pass limit as-is, None means no limit
offset=offset or 0,
@@ -172,7 +105,7 @@ class DatabaseService:
# Remove None values
filters = {k: v for k, v in filters.items() if v is not None}
count = sqlite_db.get_transaction_count(account_id=account_id, **filters)
count = self._get_transaction_count(account_id=account_id, **filters)
logger.debug(f"Total transaction count: {count}")
return count
except Exception as e:
@@ -188,7 +121,7 @@ class DatabaseService:
return []
try:
balances = sqlite_db.get_balances(account_id=account_id)
balances = self._get_balances(account_id=account_id)
logger.debug(f"Retrieved {len(balances)} balances from database")
return balances
except Exception as e:
@@ -204,9 +137,7 @@ class DatabaseService:
return []
try:
balances = sqlite_db.get_historical_balances(
account_id=account_id, days=days
)
balances = self._get_historical_balances(account_id=account_id, days=days)
logger.debug(
f"Retrieved {len(balances)} historical balance points from database"
)
@@ -223,7 +154,7 @@ class DatabaseService:
return None
try:
summary = sqlite_db.get_account_summary(account_id)
summary = self._get_account_summary(account_id)
if summary:
logger.debug(
f"Retrieved account summary from database for {account_id}"
@@ -250,7 +181,7 @@ class DatabaseService:
return []
try:
accounts = sqlite_db.get_accounts(account_ids=account_ids)
accounts = self._get_accounts(account_ids=account_ids)
logger.debug(f"Retrieved {len(accounts)} accounts from database")
return accounts
except Exception as e:
@@ -266,7 +197,7 @@ class DatabaseService:
return None
try:
account = sqlite_db.get_account(account_id)
account = self._get_account(account_id)
if account:
logger.debug(
f"Retrieved account details from database for {account_id}"
@@ -893,7 +824,7 @@ class DatabaseService:
"""Persist account details to SQLite"""
try:
# Use the sqlite_db module function
sqlite_db.persist_account(account_data)
self._persist_account(account_data)
logger.info(
f"Persisted account details to SQLite for account {account_data['id']}"
@@ -901,3 +832,453 @@ class DatabaseService:
except Exception as e:
logger.error(f"Failed to persist account details to SQLite: {e}")
raise
def _get_transactions(
self,
account_id=None,
limit=100,
offset=0,
date_from=None,
date_to=None,
min_amount=None,
max_amount=None,
search=None,
):
"""Get transactions from SQLite database with optional filtering"""
db_path = path_manager.get_database_path()
if not db_path.exists():
return []
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row # Enable dict-like access
cursor = conn.cursor()
# Build query with filters
query = "SELECT * FROM transactions WHERE 1=1"
params = []
if account_id:
query += " AND accountId = ?"
params.append(account_id)
if date_from:
query += " AND transactionDate >= ?"
params.append(date_from)
if date_to:
query += " AND transactionDate <= ?"
params.append(date_to)
if min_amount is not None:
query += " AND transactionValue >= ?"
params.append(min_amount)
if max_amount is not None:
query += " AND transactionValue <= ?"
params.append(max_amount)
if search:
query += " AND description LIKE ?"
params.append(f"%{search}%")
# Add ordering and pagination
query += " ORDER BY transactionDate DESC"
if limit:
query += " LIMIT ?"
params.append(limit)
if offset:
query += " OFFSET ?"
params.append(offset)
try:
cursor.execute(query, params)
rows = cursor.fetchall()
# Convert to list of dicts and parse JSON fields
transactions = []
for row in rows:
transaction = dict(row)
if transaction["rawTransaction"]:
transaction["rawTransaction"] = json.loads(
transaction["rawTransaction"]
)
transactions.append(transaction)
conn.close()
return transactions
except Exception as e:
conn.close()
raise e
def _get_balances(self, account_id=None):
"""Get latest balances from SQLite database"""
db_path = path_manager.get_database_path()
if not db_path.exists():
return []
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
# Get latest balance for each account_id and type combination
query = """
SELECT * FROM balances b1
WHERE b1.timestamp = (
SELECT MAX(b2.timestamp)
FROM balances b2
WHERE b2.account_id = b1.account_id AND b2.type = b1.type
)
"""
params = []
if account_id:
query += " AND b1.account_id = ?"
params.append(account_id)
query += " ORDER BY b1.account_id, b1.type"
try:
cursor.execute(query, params)
rows = cursor.fetchall()
balances = [dict(row) for row in rows]
conn.close()
return balances
except Exception as e:
conn.close()
raise e
def _get_account_summary(self, account_id):
"""Get basic account info from transactions table (avoids GoCardless API call)"""
db_path = path_manager.get_database_path()
if not db_path.exists():
return None
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
try:
# Get account info from most recent transaction
cursor.execute(
"""
SELECT DISTINCT accountId, institutionId, iban
FROM transactions
WHERE accountId = ?
ORDER BY transactionDate DESC
LIMIT 1
""",
(account_id,),
)
row = cursor.fetchone()
conn.close()
if row:
return dict(row)
return None
except Exception as e:
conn.close()
raise e
def _get_transaction_count(self, account_id=None, **filters):
"""Get total count of transactions matching filters"""
db_path = path_manager.get_database_path()
if not db_path.exists():
return 0
conn = sqlite3.connect(str(db_path))
cursor = conn.cursor()
query = "SELECT COUNT(*) FROM transactions WHERE 1=1"
params = []
if account_id:
query += " AND accountId = ?"
params.append(account_id)
# Add same filters as get_transactions
if filters.get("date_from"):
query += " AND transactionDate >= ?"
params.append(filters["date_from"])
if filters.get("date_to"):
query += " AND transactionDate <= ?"
params.append(filters["date_to"])
if filters.get("min_amount") is not None:
query += " AND transactionValue >= ?"
params.append(filters["min_amount"])
if filters.get("max_amount") is not None:
query += " AND transactionValue <= ?"
params.append(filters["max_amount"])
if filters.get("search"):
query += " AND description LIKE ?"
params.append(f"%{filters['search']}%")
try:
cursor.execute(query, params)
count = cursor.fetchone()[0]
conn.close()
return count
except Exception as e:
conn.close()
raise e
def _persist_account(self, account_data: dict):
"""Persist account details to SQLite database"""
db_path = path_manager.get_database_path()
conn = sqlite3.connect(str(db_path))
cursor = conn.cursor()
# Create the accounts table if it doesn't exist
cursor.execute(
"""CREATE TABLE IF NOT EXISTS accounts (
id TEXT PRIMARY KEY,
institution_id TEXT,
status TEXT,
iban TEXT,
name TEXT,
currency TEXT,
created DATETIME,
last_accessed DATETIME,
last_updated DATETIME
)"""
)
# Create indexes for accounts table
cursor.execute(
"""CREATE INDEX IF NOT EXISTS idx_accounts_institution_id
ON accounts(institution_id)"""
)
cursor.execute(
"""CREATE INDEX IF NOT EXISTS idx_accounts_status
ON accounts(status)"""
)
try:
# Insert or replace account data
cursor.execute(
"""INSERT OR REPLACE INTO accounts (
id,
institution_id,
status,
iban,
name,
currency,
created,
last_accessed,
last_updated
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
account_data["id"],
account_data["institution_id"],
account_data["status"],
account_data.get("iban"),
account_data.get("name"),
account_data.get("currency"),
account_data["created"],
account_data.get("last_accessed"),
account_data.get("last_updated", account_data["created"]),
),
)
conn.commit()
conn.close()
return account_data
except Exception as e:
conn.close()
raise e
def _get_accounts(self, account_ids=None):
"""Get account details from SQLite database"""
db_path = path_manager.get_database_path()
if not db_path.exists():
return []
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
query = "SELECT * FROM accounts"
params = []
if account_ids:
placeholders = ",".join("?" * len(account_ids))
query += f" WHERE id IN ({placeholders})"
params.extend(account_ids)
query += " ORDER BY created DESC"
try:
cursor.execute(query, params)
rows = cursor.fetchall()
accounts = [dict(row) for row in rows]
conn.close()
return accounts
except Exception as e:
conn.close()
raise e
def _get_account(self, account_id: str):
"""Get specific account details from SQLite database"""
db_path = path_manager.get_database_path()
if not db_path.exists():
return None
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
try:
cursor.execute("SELECT * FROM accounts WHERE id = ?", (account_id,))
row = cursor.fetchone()
conn.close()
if row:
return dict(row)
return None
except Exception as e:
conn.close()
raise e
def _get_historical_balances(self, account_id=None, days=365):
"""Get historical balance progression based on transaction history"""
db_path = path_manager.get_database_path()
if not db_path.exists():
return []
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
try:
# Get current balance for each account/type to use as the final balance
current_balances_query = """
SELECT account_id, type, amount, currency
FROM balances b1
WHERE b1.timestamp = (
SELECT MAX(b2.timestamp)
FROM balances b2
WHERE b2.account_id = b1.account_id AND b2.type = b1.type
)
"""
params = []
if account_id:
current_balances_query += " AND b1.account_id = ?"
params.append(account_id)
cursor.execute(current_balances_query, params)
current_balances = {
(row["account_id"], row["type"]): {
"amount": row["amount"],
"currency": row["currency"],
}
for row in cursor.fetchall()
}
# Get transactions for the specified period, ordered by date descending
cutoff_date = (datetime.now() - timedelta(days=days)).isoformat()
transactions_query = """
SELECT accountId, transactionDate, transactionValue
FROM transactions
WHERE transactionDate >= ?
"""
if account_id:
transactions_query += " AND accountId = ?"
params = [cutoff_date, account_id]
else:
params = [cutoff_date]
transactions_query += " ORDER BY transactionDate DESC"
cursor.execute(transactions_query, params)
transactions = cursor.fetchall()
# Calculate historical balances by working backwards from current balance
historical_balances = []
account_running_balances: dict[str, dict[str, float]] = {}
# Initialize running balances with current balances
for (acc_id, balance_type), balance_info in current_balances.items():
if acc_id not in account_running_balances:
account_running_balances[acc_id] = {}
account_running_balances[acc_id][balance_type] = balance_info["amount"]
# Group transactions by date
transactions_by_date = defaultdict(list)
for txn in transactions:
date_str = txn["transactionDate"][:10] # Extract just the date part
transactions_by_date[date_str].append(txn)
# Generate historical balance points
# Start from today and work backwards
current_date = datetime.now().date()
for day_offset in range(0, days, 7): # Sample every 7 days for performance
target_date = current_date - timedelta(days=day_offset)
target_date_str = target_date.isoformat()
# For each account, create balance entries
for acc_id in account_running_balances:
for balance_type in [
"closingBooked"
]: # Focus on closingBooked for the chart
if balance_type in account_running_balances[acc_id]:
balance_amount = account_running_balances[acc_id][
balance_type
]
currency = current_balances.get(
(acc_id, balance_type), {}
).get("currency", "EUR")
historical_balances.append(
{
"id": f"{acc_id}_{balance_type}_{target_date_str}",
"account_id": acc_id,
"balance_amount": balance_amount,
"balance_type": balance_type,
"currency": currency,
"reference_date": target_date_str,
"created_at": None,
"updated_at": None,
}
)
# Subtract transactions that occurred on this date and later dates
# to simulate going back in time
for date_str in list(transactions_by_date.keys()):
if date_str >= target_date_str:
for txn in transactions_by_date[date_str]:
acc_id = txn["accountId"]
amount = txn["transactionValue"]
if acc_id in account_running_balances:
for balance_type in account_running_balances[acc_id]:
account_running_balances[acc_id][balance_type] -= (
amount
)
# Remove processed transactions to avoid double-processing
del transactions_by_date[date_str]
conn.close()
# Sort by date for proper chronological order
historical_balances.sort(key=lambda x: x["reference_date"])
return historical_balances
except Exception as e:
conn.close()
raise e