refactor: Consolidate service layer with dedicated data processors.

Introduces a DataProcessor layer to separate transformation logic from orchestration and persistence concerns:

- Created data_processors/ directory with AccountEnricher, BalanceTransformer, AnalyticsProcessor, and moved TransactionProcessor
- Refactored SyncService to pure orchestrator, removing account/balance enrichment logic
- Refactored DatabaseService to pure CRUD, removing analytics and transformation logic
- Extracted 90+ lines of analytics SQL from DatabaseService to AnalyticsProcessor
- Extracted 80+ lines of balance transformation logic to BalanceTransformer
- Maintained backward compatibility - all 109 tests pass
- No API contract changes

This improves code clarity, testability, and maintainability while maintaining the existing API surface.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
This commit is contained in:
Elisiário Couto
2025-12-07 01:11:54 +00:00
committed by Elisiário Couto
parent 3d5994bf30
commit fbb3eb9e64
7 changed files with 404 additions and 221 deletions

View File

@@ -0,0 +1,13 @@
"""Data processing layer for all transformation logic."""
from leggen.services.data_processors.account_enricher import AccountEnricher
from leggen.services.data_processors.analytics_processor import AnalyticsProcessor
from leggen.services.data_processors.balance_transformer import BalanceTransformer
from leggen.services.data_processors.transaction_processor import TransactionProcessor
__all__ = [
"AccountEnricher",
"AnalyticsProcessor",
"BalanceTransformer",
"TransactionProcessor",
]

View File

@@ -0,0 +1,71 @@
"""Account enrichment processor for adding currency, logos, and metadata."""
from typing import Any, Dict
from loguru import logger
from leggen.services.gocardless_service import GoCardlessService
class AccountEnricher:
"""Enriches account details with currency and institution information."""
def __init__(self):
self.gocardless = GoCardlessService()
async def enrich_account_details(
self,
account_details: Dict[str, Any],
balances: Dict[str, Any],
) -> Dict[str, Any]:
"""
Enrich account details with currency from balances and institution logo.
Args:
account_details: Raw account details from GoCardless
balances: Balance data containing currency information
Returns:
Enriched account details with currency and logo added
"""
enriched_account = account_details.copy()
# Extract currency from first balance
currency = self._extract_currency_from_balances(balances)
if currency:
enriched_account["currency"] = currency
# Fetch and add institution logo
institution_id = enriched_account.get("institution_id")
if institution_id:
logo = await self._fetch_institution_logo(institution_id)
if logo:
enriched_account["logo"] = logo
return enriched_account
def _extract_currency_from_balances(self, balances: Dict[str, Any]) -> str | None:
"""Extract currency from the first balance in the balances data."""
balances_list = balances.get("balances", [])
if not balances_list:
return None
first_balance = balances_list[0]
balance_amount = first_balance.get("balanceAmount", {})
return balance_amount.get("currency")
async def _fetch_institution_logo(self, institution_id: str) -> str | None:
"""Fetch institution logo from GoCardless API."""
try:
institution_details = (
await self.gocardless.get_institution_details(institution_id)
)
logo = institution_details.get("logo", "")
if logo:
logger.info(f"Fetched logo for institution {institution_id}: {logo}")
return logo
except Exception as e:
logger.warning(
f"Failed to fetch institution details for {institution_id}: {e}"
)
return None

View File

@@ -0,0 +1,201 @@
"""Analytics processor for calculating historical balances and statistics."""
import sqlite3
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any, Dict, List, Optional
from loguru import logger
class AnalyticsProcessor:
"""Calculates historical balances and transaction statistics from database data."""
def calculate_historical_balances(
self,
db_path: Path,
account_id: Optional[str] = None,
days: int = 365,
) -> List[Dict[str, Any]]:
"""
Generate historical balance progression based on transaction history.
Uses current balances and subtracts future transactions to calculate
balance at each historical point in time.
Args:
db_path: Path to SQLite database
account_id: Optional account ID to filter by
days: Number of days to look back (default 365)
Returns:
List of historical balance data points
"""
if not db_path.exists():
return []
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
try:
cutoff_date = (datetime.now() - timedelta(days=days)).date().isoformat()
today_date = datetime.now().date().isoformat()
# Single SQL query to generate historical balances using window functions
query = """
WITH RECURSIVE date_series AS (
-- Generate weekly dates from cutoff_date to today
SELECT date(?) as ref_date
UNION ALL
SELECT date(ref_date, '+7 days')
FROM date_series
WHERE ref_date < date(?)
),
current_balances AS (
-- Get current balance for each account/type
SELECT account_id, type, amount, currency
FROM balances b1
WHERE b1.timestamp = (
SELECT MAX(b2.timestamp)
FROM balances b2
WHERE b2.account_id = b1.account_id AND b2.type = b1.type
)
{account_filter}
AND b1.type = 'closingBooked' -- Focus on closingBooked for charts
),
historical_points AS (
-- Calculate balance at each weekly point by subtracting future transactions
SELECT
cb.account_id,
cb.type as balance_type,
cb.currency,
ds.ref_date,
cb.amount - COALESCE(
(SELECT SUM(t.transactionValue)
FROM transactions t
WHERE t.accountId = cb.account_id
AND date(t.transactionDate) > ds.ref_date), 0
) as balance_amount
FROM current_balances cb
CROSS JOIN date_series ds
)
SELECT
account_id || '_' || balance_type || '_' || ref_date as id,
account_id,
balance_amount,
balance_type,
currency,
ref_date as reference_date
FROM historical_points
ORDER BY account_id, ref_date
"""
# Build parameters and account filter
params = [cutoff_date, today_date]
if account_id:
account_filter = "AND b1.account_id = ?"
params.append(account_id)
else:
account_filter = ""
# Format the query with conditional filter
formatted_query = query.format(account_filter=account_filter)
cursor.execute(formatted_query, params)
rows = cursor.fetchall()
conn.close()
return [dict(row) for row in rows]
except Exception as e:
conn.close()
logger.error(f"Failed to calculate historical balances: {e}")
raise
def calculate_monthly_stats(
self,
db_path: Path,
account_id: Optional[str] = None,
date_from: Optional[str] = None,
date_to: Optional[str] = None,
) -> List[Dict[str, Any]]:
"""
Calculate monthly transaction statistics aggregated from database.
Sums transactions by month and calculates income, expenses, and net values.
Args:
db_path: Path to SQLite database
account_id: Optional account ID to filter by
date_from: Optional start date (ISO format)
date_to: Optional end date (ISO format)
Returns:
List of monthly statistics with income, expenses, and net totals
"""
if not db_path.exists():
return []
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
try:
# SQL query to aggregate transactions by month
query = """
SELECT
strftime('%Y-%m', transactionDate) as month,
COALESCE(SUM(CASE WHEN transactionValue > 0 THEN transactionValue ELSE 0 END), 0) as income,
COALESCE(SUM(CASE WHEN transactionValue < 0 THEN ABS(transactionValue) ELSE 0 END), 0) as expenses,
COALESCE(SUM(transactionValue), 0) as net
FROM transactions
WHERE 1=1
"""
params = []
if account_id:
query += " AND accountId = ?"
params.append(account_id)
if date_from:
query += " AND transactionDate >= ?"
params.append(date_from)
if date_to:
query += " AND transactionDate <= ?"
params.append(date_to)
query += """
GROUP BY strftime('%Y-%m', transactionDate)
ORDER BY month ASC
"""
cursor.execute(query, params)
rows = cursor.fetchall()
# Convert to desired format with proper month display
monthly_stats = []
for row in rows:
# Convert YYYY-MM to display format like "Mar 2024"
year, month_num = row["month"].split("-")
month_date = datetime.strptime(f"{year}-{month_num}-01", "%Y-%m-%d")
display_month = month_date.strftime("%b %Y")
monthly_stats.append(
{
"month": display_month,
"income": round(row["income"], 2),
"expenses": round(row["expenses"], 2),
"net": round(row["net"], 2),
}
)
conn.close()
return monthly_stats
except Exception as e:
conn.close()
logger.error(f"Failed to calculate monthly stats: {e}")
raise

View File

@@ -0,0 +1,69 @@
"""Balance data transformation processor for format conversions."""
from datetime import datetime
from typing import Any, Dict, List, Tuple
class BalanceTransformer:
"""Transforms balance data between GoCardless and internal database formats."""
def merge_account_metadata_into_balances(
self,
balances: Dict[str, Any],
account_details: Dict[str, Any],
) -> Dict[str, Any]:
"""
Merge account metadata into balance data for proper persistence.
This adds institution_id, iban, and account_status to the balances
so they can be persisted alongside the balance data.
Args:
balances: Raw balance data from GoCardless
account_details: Enriched account details containing metadata
Returns:
Balance data with account metadata merged in
"""
balances_with_metadata = balances.copy()
balances_with_metadata["institution_id"] = account_details.get("institution_id")
balances_with_metadata["iban"] = account_details.get("iban")
balances_with_metadata["account_status"] = account_details.get("status")
return balances_with_metadata
def transform_to_database_format(
self,
account_id: str,
balance_data: Dict[str, Any],
) -> List[Tuple[Any, ...]]:
"""
Transform GoCardless balance format to database row format.
Converts nested GoCardless balance structure into flat tuples
ready for SQLite insertion.
Args:
account_id: The account ID
balance_data: Balance data with merged account metadata
Returns:
List of tuples in database row format (account_id, bank, status, ...)
"""
rows = []
for balance in balance_data.get("balances", []):
balance_amount = balance.get("balanceAmount", {})
row = (
account_id,
balance_data.get("institution_id", "unknown"),
balance_data.get("account_status"),
balance_data.get("iban", "N/A"),
float(balance_amount.get("amount", 0)),
balance_amount.get("currency"),
balance.get("balanceType"),
datetime.now().isoformat(),
)
rows.append(row)
return rows

View File

@@ -1,11 +1,15 @@
import json
import sqlite3
from datetime import datetime, timedelta
from datetime import datetime
from typing import Any, Dict, List, Optional
from loguru import logger
from leggen.services.transaction_processor import TransactionProcessor
from leggen.services.data_processors import (
AnalyticsProcessor,
BalanceTransformer,
TransactionProcessor,
)
from leggen.utils.config import config
from leggen.utils.paths import path_manager
@@ -14,7 +18,11 @@ class DatabaseService:
def __init__(self):
self.db_config = config.database_config
self.sqlite_enabled = self.db_config.get("sqlite", True)
# Data processors
self.transaction_processor = TransactionProcessor()
self.balance_transformer = BalanceTransformer()
self.analytics_processor = AnalyticsProcessor()
async def persist_balance(
self, account_id: str, balance_data: Dict[str, Any]
@@ -136,7 +144,10 @@ class DatabaseService:
return []
try:
balances = self._get_historical_balances(account_id=account_id, days=days)
db_path = path_manager.get_database_path()
balances = self.analytics_processor.calculate_historical_balances(
db_path, account_id=account_id, days=days
)
logger.debug(
f"Retrieved {len(balances)} historical balance points from database"
)
@@ -753,10 +764,12 @@ class DatabaseService:
ON balances(account_id, type, timestamp)"""
)
# Convert GoCardless balance format to our format and persist
for balance in balance_data.get("balances", []):
balance_amount = balance["balanceAmount"]
# Transform and persist balances
balance_rows = self.balance_transformer.transform_to_database_format(
account_id, balance_data
)
for row in balance_rows:
try:
cursor.execute(
"""INSERT INTO balances (
@@ -769,16 +782,7 @@ class DatabaseService:
type,
timestamp
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
(
account_id,
balance_data.get("institution_id", "unknown"),
balance_data.get("account_status"),
balance_data.get("iban", "N/A"),
float(balance_amount["amount"]),
balance_amount["currency"],
balance["balanceType"],
datetime.now().isoformat(),
),
row,
)
except sqlite3.IntegrityError:
logger.warning(f"Skipped duplicate balance for {account_id}")
@@ -1251,90 +1255,6 @@ class DatabaseService:
conn.close()
raise e
def _get_historical_balances(self, account_id=None, days=365):
"""Get historical balance progression based on transaction history"""
db_path = path_manager.get_database_path()
if not db_path.exists():
return []
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
try:
cutoff_date = (datetime.now() - timedelta(days=days)).date().isoformat()
today_date = datetime.now().date().isoformat()
# Single SQL query to generate historical balances using window functions
query = """
WITH RECURSIVE date_series AS (
-- Generate weekly dates from cutoff_date to today
SELECT date(?) as ref_date
UNION ALL
SELECT date(ref_date, '+7 days')
FROM date_series
WHERE ref_date < date(?)
),
current_balances AS (
-- Get current balance for each account/type
SELECT account_id, type, amount, currency
FROM balances b1
WHERE b1.timestamp = (
SELECT MAX(b2.timestamp)
FROM balances b2
WHERE b2.account_id = b1.account_id AND b2.type = b1.type
)
{account_filter}
AND b1.type = 'closingBooked' -- Focus on closingBooked for charts
),
historical_points AS (
-- Calculate balance at each weekly point by subtracting future transactions
SELECT
cb.account_id,
cb.type as balance_type,
cb.currency,
ds.ref_date,
cb.amount - COALESCE(
(SELECT SUM(t.transactionValue)
FROM transactions t
WHERE t.accountId = cb.account_id
AND date(t.transactionDate) > ds.ref_date), 0
) as balance_amount
FROM current_balances cb
CROSS JOIN date_series ds
)
SELECT
account_id || '_' || balance_type || '_' || ref_date as id,
account_id,
balance_amount,
balance_type,
currency,
ref_date as reference_date
FROM historical_points
ORDER BY account_id, ref_date
"""
# Build parameters and account filter
params = [cutoff_date, today_date]
if account_id:
account_filter = "AND b1.account_id = ?"
params.append(account_id)
else:
account_filter = ""
# Format the query with conditional filter
formatted_query = query.format(account_filter=account_filter)
cursor.execute(formatted_query, params)
rows = cursor.fetchall()
conn.close()
return [dict(row) for row in rows]
except Exception as e:
conn.close()
raise e
async def get_monthly_transaction_stats_from_db(
self,
account_id: Optional[str] = None,
@@ -1347,10 +1267,9 @@ class DatabaseService:
return []
try:
monthly_stats = self._get_monthly_transaction_stats(
account_id=account_id,
date_from=date_from,
date_to=date_to,
db_path = path_manager.get_database_path()
monthly_stats = self.analytics_processor.calculate_monthly_stats(
db_path, account_id=account_id, date_from=date_from, date_to=date_to
)
logger.debug(
f"Retrieved {len(monthly_stats)} monthly stat points from database"
@@ -1360,79 +1279,6 @@ class DatabaseService:
logger.error(f"Failed to get monthly transaction stats from database: {e}")
return []
def _get_monthly_transaction_stats(
self,
account_id: Optional[str] = None,
date_from: Optional[str] = None,
date_to: Optional[str] = None,
) -> List[Dict[str, Any]]:
"""Get monthly transaction statistics from SQLite database"""
db_path = path_manager.get_database_path()
if not db_path.exists():
return []
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
try:
# SQL query to aggregate transactions by month
query = """
SELECT
strftime('%Y-%m', transactionDate) as month,
COALESCE(SUM(CASE WHEN transactionValue > 0 THEN transactionValue ELSE 0 END), 0) as income,
COALESCE(SUM(CASE WHEN transactionValue < 0 THEN ABS(transactionValue) ELSE 0 END), 0) as expenses,
COALESCE(SUM(transactionValue), 0) as net
FROM transactions
WHERE 1=1
"""
params = []
if account_id:
query += " AND accountId = ?"
params.append(account_id)
if date_from:
query += " AND transactionDate >= ?"
params.append(date_from)
if date_to:
query += " AND transactionDate <= ?"
params.append(date_to)
query += """
GROUP BY strftime('%Y-%m', transactionDate)
ORDER BY month ASC
"""
cursor.execute(query, params)
rows = cursor.fetchall()
# Convert to desired format with proper month display
monthly_stats = []
for row in rows:
# Convert YYYY-MM to display format like "Mar 2024"
year, month_num = row["month"].split("-")
month_date = datetime.strptime(f"{year}-{month_num}-01", "%Y-%m-%d")
display_month = month_date.strftime("%b %Y")
monthly_stats.append(
{
"month": display_month,
"income": round(row["income"], 2),
"expenses": round(row["expenses"], 2),
"net": round(row["net"], 2),
}
)
conn.close()
return monthly_stats
except Exception as e:
conn.close()
raise e
async def _check_sync_operations_migration_needed(self) -> bool:
"""Check if sync_operations table needs to be created"""
db_path = path_manager.get_database_path()

View File

@@ -4,6 +4,11 @@ from typing import List
from loguru import logger
from leggen.api.models.sync import SyncResult, SyncStatus
from leggen.services.data_processors import (
AccountEnricher,
BalanceTransformer,
TransactionProcessor,
)
from leggen.services.database_service import DatabaseService
from leggen.services.gocardless_service import GoCardlessService
from leggen.services.notification_service import NotificationService
@@ -17,8 +22,13 @@ class SyncService:
self.gocardless = GoCardlessService()
self.database = DatabaseService()
self.notifications = NotificationService()
# Data processors
self.account_enricher = AccountEnricher()
self.balance_transformer = BalanceTransformer()
self.transaction_processor = TransactionProcessor()
self._sync_status = SyncStatus(is_running=False)
self._institution_logos = {} # Cache for institution logos
async def get_sync_status(self) -> SyncStatus:
"""Get current sync status"""
@@ -84,54 +94,25 @@ class SyncService:
# Get balances to extract currency information
balances = await self.gocardless.get_account_balances(account_id)
# Enrich account details with currency and institution logo
# Enrich and persist account details
if account_details and balances:
enriched_account_details = account_details.copy()
# Extract currency from first balance
balances_list = balances.get("balances", [])
if balances_list:
first_balance = balances_list[0]
balance_amount = first_balance.get("balanceAmount", {})
currency = balance_amount.get("currency")
if currency:
enriched_account_details["currency"] = currency
# Get institution details to fetch logo
institution_id = enriched_account_details.get("institution_id")
if institution_id:
try:
institution_details = (
await self.gocardless.get_institution_details(
institution_id
)
)
enriched_account_details["logo"] = (
institution_details.get("logo", "")
)
logger.info(
f"Fetched logo for institution {institution_id}: {enriched_account_details.get('logo', 'No logo')}"
)
except Exception as e:
logger.warning(
f"Failed to fetch institution details for {institution_id}: {e}"
)
# Enrich account with currency and institution logo
enriched_account_details = (
await self.account_enricher.enrich_account_details(
account_details, balances
)
)
# Persist enriched account details to database
await self.database.persist_account_details(
enriched_account_details
)
# Merge account details into balances data for proper persistence
balances_with_account_info = balances.copy()
balances_with_account_info["institution_id"] = (
enriched_account_details.get("institution_id")
)
balances_with_account_info["iban"] = (
enriched_account_details.get("iban")
)
balances_with_account_info["account_status"] = (
enriched_account_details.get("status")
# Merge account metadata into balances for persistence
balances_with_account_info = (
self.balance_transformer.merge_account_metadata_into_balances(
balances, enriched_account_details
)
)
await self.database.persist_balance(
account_id, balances_with_account_info
@@ -146,8 +127,10 @@ class SyncService:
account_id
)
if transactions:
processed_transactions = self.database.process_transactions(
account_id, account_details, transactions
processed_transactions = (
self.transaction_processor.process_transactions(
account_id, account_details, transactions
)
)
new_transactions = await self.database.persist_transactions(
account_id, processed_transactions