diff --git a/leggen/services/data_processors/__init__.py b/leggen/services/data_processors/__init__.py new file mode 100644 index 0000000..b2dd508 --- /dev/null +++ b/leggen/services/data_processors/__init__.py @@ -0,0 +1,13 @@ +"""Data processing layer for all transformation logic.""" + +from leggen.services.data_processors.account_enricher import AccountEnricher +from leggen.services.data_processors.analytics_processor import AnalyticsProcessor +from leggen.services.data_processors.balance_transformer import BalanceTransformer +from leggen.services.data_processors.transaction_processor import TransactionProcessor + +__all__ = [ + "AccountEnricher", + "AnalyticsProcessor", + "BalanceTransformer", + "TransactionProcessor", +] diff --git a/leggen/services/data_processors/account_enricher.py b/leggen/services/data_processors/account_enricher.py new file mode 100644 index 0000000..d5abdce --- /dev/null +++ b/leggen/services/data_processors/account_enricher.py @@ -0,0 +1,71 @@ +"""Account enrichment processor for adding currency, logos, and metadata.""" + +from typing import Any, Dict + +from loguru import logger + +from leggen.services.gocardless_service import GoCardlessService + + +class AccountEnricher: + """Enriches account details with currency and institution information.""" + + def __init__(self): + self.gocardless = GoCardlessService() + + async def enrich_account_details( + self, + account_details: Dict[str, Any], + balances: Dict[str, Any], + ) -> Dict[str, Any]: + """ + Enrich account details with currency from balances and institution logo. + + Args: + account_details: Raw account details from GoCardless + balances: Balance data containing currency information + + Returns: + Enriched account details with currency and logo added + """ + enriched_account = account_details.copy() + + # Extract currency from first balance + currency = self._extract_currency_from_balances(balances) + if currency: + enriched_account["currency"] = currency + + # Fetch and add institution logo + institution_id = enriched_account.get("institution_id") + if institution_id: + logo = await self._fetch_institution_logo(institution_id) + if logo: + enriched_account["logo"] = logo + + return enriched_account + + def _extract_currency_from_balances(self, balances: Dict[str, Any]) -> str | None: + """Extract currency from the first balance in the balances data.""" + balances_list = balances.get("balances", []) + if not balances_list: + return None + + first_balance = balances_list[0] + balance_amount = first_balance.get("balanceAmount", {}) + return balance_amount.get("currency") + + async def _fetch_institution_logo(self, institution_id: str) -> str | None: + """Fetch institution logo from GoCardless API.""" + try: + institution_details = ( + await self.gocardless.get_institution_details(institution_id) + ) + logo = institution_details.get("logo", "") + if logo: + logger.info(f"Fetched logo for institution {institution_id}: {logo}") + return logo + except Exception as e: + logger.warning( + f"Failed to fetch institution details for {institution_id}: {e}" + ) + return None diff --git a/leggen/services/data_processors/analytics_processor.py b/leggen/services/data_processors/analytics_processor.py new file mode 100644 index 0000000..ee6ac94 --- /dev/null +++ b/leggen/services/data_processors/analytics_processor.py @@ -0,0 +1,201 @@ +"""Analytics processor for calculating historical balances and statistics.""" + +import sqlite3 +from datetime import datetime, timedelta +from pathlib import Path +from typing import Any, Dict, List, Optional + +from loguru import logger + + +class AnalyticsProcessor: + """Calculates historical balances and transaction statistics from database data.""" + + def calculate_historical_balances( + self, + db_path: Path, + account_id: Optional[str] = None, + days: int = 365, + ) -> List[Dict[str, Any]]: + """ + Generate historical balance progression based on transaction history. + + Uses current balances and subtracts future transactions to calculate + balance at each historical point in time. + + Args: + db_path: Path to SQLite database + account_id: Optional account ID to filter by + days: Number of days to look back (default 365) + + Returns: + List of historical balance data points + """ + if not db_path.exists(): + return [] + + conn = sqlite3.connect(str(db_path)) + conn.row_factory = sqlite3.Row + cursor = conn.cursor() + + try: + cutoff_date = (datetime.now() - timedelta(days=days)).date().isoformat() + today_date = datetime.now().date().isoformat() + + # Single SQL query to generate historical balances using window functions + query = """ + WITH RECURSIVE date_series AS ( + -- Generate weekly dates from cutoff_date to today + SELECT date(?) as ref_date + UNION ALL + SELECT date(ref_date, '+7 days') + FROM date_series + WHERE ref_date < date(?) + ), + current_balances AS ( + -- Get current balance for each account/type + SELECT account_id, type, amount, currency + FROM balances b1 + WHERE b1.timestamp = ( + SELECT MAX(b2.timestamp) + FROM balances b2 + WHERE b2.account_id = b1.account_id AND b2.type = b1.type + ) + {account_filter} + AND b1.type = 'closingBooked' -- Focus on closingBooked for charts + ), + historical_points AS ( + -- Calculate balance at each weekly point by subtracting future transactions + SELECT + cb.account_id, + cb.type as balance_type, + cb.currency, + ds.ref_date, + cb.amount - COALESCE( + (SELECT SUM(t.transactionValue) + FROM transactions t + WHERE t.accountId = cb.account_id + AND date(t.transactionDate) > ds.ref_date), 0 + ) as balance_amount + FROM current_balances cb + CROSS JOIN date_series ds + ) + SELECT + account_id || '_' || balance_type || '_' || ref_date as id, + account_id, + balance_amount, + balance_type, + currency, + ref_date as reference_date + FROM historical_points + ORDER BY account_id, ref_date + """ + + # Build parameters and account filter + params = [cutoff_date, today_date] + if account_id: + account_filter = "AND b1.account_id = ?" + params.append(account_id) + else: + account_filter = "" + + # Format the query with conditional filter + formatted_query = query.format(account_filter=account_filter) + + cursor.execute(formatted_query, params) + rows = cursor.fetchall() + + conn.close() + return [dict(row) for row in rows] + + except Exception as e: + conn.close() + logger.error(f"Failed to calculate historical balances: {e}") + raise + + def calculate_monthly_stats( + self, + db_path: Path, + account_id: Optional[str] = None, + date_from: Optional[str] = None, + date_to: Optional[str] = None, + ) -> List[Dict[str, Any]]: + """ + Calculate monthly transaction statistics aggregated from database. + + Sums transactions by month and calculates income, expenses, and net values. + + Args: + db_path: Path to SQLite database + account_id: Optional account ID to filter by + date_from: Optional start date (ISO format) + date_to: Optional end date (ISO format) + + Returns: + List of monthly statistics with income, expenses, and net totals + """ + if not db_path.exists(): + return [] + + conn = sqlite3.connect(str(db_path)) + conn.row_factory = sqlite3.Row + cursor = conn.cursor() + + try: + # SQL query to aggregate transactions by month + query = """ + SELECT + strftime('%Y-%m', transactionDate) as month, + COALESCE(SUM(CASE WHEN transactionValue > 0 THEN transactionValue ELSE 0 END), 0) as income, + COALESCE(SUM(CASE WHEN transactionValue < 0 THEN ABS(transactionValue) ELSE 0 END), 0) as expenses, + COALESCE(SUM(transactionValue), 0) as net + FROM transactions + WHERE 1=1 + """ + + params = [] + + if account_id: + query += " AND accountId = ?" + params.append(account_id) + + if date_from: + query += " AND transactionDate >= ?" + params.append(date_from) + + if date_to: + query += " AND transactionDate <= ?" + params.append(date_to) + + query += """ + GROUP BY strftime('%Y-%m', transactionDate) + ORDER BY month ASC + """ + + cursor.execute(query, params) + rows = cursor.fetchall() + + # Convert to desired format with proper month display + monthly_stats = [] + for row in rows: + # Convert YYYY-MM to display format like "Mar 2024" + year, month_num = row["month"].split("-") + month_date = datetime.strptime(f"{year}-{month_num}-01", "%Y-%m-%d") + display_month = month_date.strftime("%b %Y") + + monthly_stats.append( + { + "month": display_month, + "income": round(row["income"], 2), + "expenses": round(row["expenses"], 2), + "net": round(row["net"], 2), + } + ) + + conn.close() + return monthly_stats + + except Exception as e: + conn.close() + logger.error(f"Failed to calculate monthly stats: {e}") + raise diff --git a/leggen/services/data_processors/balance_transformer.py b/leggen/services/data_processors/balance_transformer.py new file mode 100644 index 0000000..e125abe --- /dev/null +++ b/leggen/services/data_processors/balance_transformer.py @@ -0,0 +1,69 @@ +"""Balance data transformation processor for format conversions.""" + +from datetime import datetime +from typing import Any, Dict, List, Tuple + + +class BalanceTransformer: + """Transforms balance data between GoCardless and internal database formats.""" + + def merge_account_metadata_into_balances( + self, + balances: Dict[str, Any], + account_details: Dict[str, Any], + ) -> Dict[str, Any]: + """ + Merge account metadata into balance data for proper persistence. + + This adds institution_id, iban, and account_status to the balances + so they can be persisted alongside the balance data. + + Args: + balances: Raw balance data from GoCardless + account_details: Enriched account details containing metadata + + Returns: + Balance data with account metadata merged in + """ + balances_with_metadata = balances.copy() + balances_with_metadata["institution_id"] = account_details.get("institution_id") + balances_with_metadata["iban"] = account_details.get("iban") + balances_with_metadata["account_status"] = account_details.get("status") + return balances_with_metadata + + def transform_to_database_format( + self, + account_id: str, + balance_data: Dict[str, Any], + ) -> List[Tuple[Any, ...]]: + """ + Transform GoCardless balance format to database row format. + + Converts nested GoCardless balance structure into flat tuples + ready for SQLite insertion. + + Args: + account_id: The account ID + balance_data: Balance data with merged account metadata + + Returns: + List of tuples in database row format (account_id, bank, status, ...) + """ + rows = [] + + for balance in balance_data.get("balances", []): + balance_amount = balance.get("balanceAmount", {}) + + row = ( + account_id, + balance_data.get("institution_id", "unknown"), + balance_data.get("account_status"), + balance_data.get("iban", "N/A"), + float(balance_amount.get("amount", 0)), + balance_amount.get("currency"), + balance.get("balanceType"), + datetime.now().isoformat(), + ) + rows.append(row) + + return rows diff --git a/leggen/services/transaction_processor.py b/leggen/services/data_processors/transaction_processor.py similarity index 100% rename from leggen/services/transaction_processor.py rename to leggen/services/data_processors/transaction_processor.py diff --git a/leggen/services/database_service.py b/leggen/services/database_service.py index c8f977b..d1c68c6 100644 --- a/leggen/services/database_service.py +++ b/leggen/services/database_service.py @@ -1,11 +1,15 @@ import json import sqlite3 -from datetime import datetime, timedelta +from datetime import datetime from typing import Any, Dict, List, Optional from loguru import logger -from leggen.services.transaction_processor import TransactionProcessor +from leggen.services.data_processors import ( + AnalyticsProcessor, + BalanceTransformer, + TransactionProcessor, +) from leggen.utils.config import config from leggen.utils.paths import path_manager @@ -14,7 +18,11 @@ class DatabaseService: def __init__(self): self.db_config = config.database_config self.sqlite_enabled = self.db_config.get("sqlite", True) + + # Data processors self.transaction_processor = TransactionProcessor() + self.balance_transformer = BalanceTransformer() + self.analytics_processor = AnalyticsProcessor() async def persist_balance( self, account_id: str, balance_data: Dict[str, Any] @@ -136,7 +144,10 @@ class DatabaseService: return [] try: - balances = self._get_historical_balances(account_id=account_id, days=days) + db_path = path_manager.get_database_path() + balances = self.analytics_processor.calculate_historical_balances( + db_path, account_id=account_id, days=days + ) logger.debug( f"Retrieved {len(balances)} historical balance points from database" ) @@ -753,10 +764,12 @@ class DatabaseService: ON balances(account_id, type, timestamp)""" ) - # Convert GoCardless balance format to our format and persist - for balance in balance_data.get("balances", []): - balance_amount = balance["balanceAmount"] + # Transform and persist balances + balance_rows = self.balance_transformer.transform_to_database_format( + account_id, balance_data + ) + for row in balance_rows: try: cursor.execute( """INSERT INTO balances ( @@ -769,16 +782,7 @@ class DatabaseService: type, timestamp ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", - ( - account_id, - balance_data.get("institution_id", "unknown"), - balance_data.get("account_status"), - balance_data.get("iban", "N/A"), - float(balance_amount["amount"]), - balance_amount["currency"], - balance["balanceType"], - datetime.now().isoformat(), - ), + row, ) except sqlite3.IntegrityError: logger.warning(f"Skipped duplicate balance for {account_id}") @@ -1251,90 +1255,6 @@ class DatabaseService: conn.close() raise e - def _get_historical_balances(self, account_id=None, days=365): - """Get historical balance progression based on transaction history""" - db_path = path_manager.get_database_path() - if not db_path.exists(): - return [] - - conn = sqlite3.connect(str(db_path)) - conn.row_factory = sqlite3.Row - cursor = conn.cursor() - - try: - cutoff_date = (datetime.now() - timedelta(days=days)).date().isoformat() - today_date = datetime.now().date().isoformat() - - # Single SQL query to generate historical balances using window functions - query = """ - WITH RECURSIVE date_series AS ( - -- Generate weekly dates from cutoff_date to today - SELECT date(?) as ref_date - UNION ALL - SELECT date(ref_date, '+7 days') - FROM date_series - WHERE ref_date < date(?) - ), - current_balances AS ( - -- Get current balance for each account/type - SELECT account_id, type, amount, currency - FROM balances b1 - WHERE b1.timestamp = ( - SELECT MAX(b2.timestamp) - FROM balances b2 - WHERE b2.account_id = b1.account_id AND b2.type = b1.type - ) - {account_filter} - AND b1.type = 'closingBooked' -- Focus on closingBooked for charts - ), - historical_points AS ( - -- Calculate balance at each weekly point by subtracting future transactions - SELECT - cb.account_id, - cb.type as balance_type, - cb.currency, - ds.ref_date, - cb.amount - COALESCE( - (SELECT SUM(t.transactionValue) - FROM transactions t - WHERE t.accountId = cb.account_id - AND date(t.transactionDate) > ds.ref_date), 0 - ) as balance_amount - FROM current_balances cb - CROSS JOIN date_series ds - ) - SELECT - account_id || '_' || balance_type || '_' || ref_date as id, - account_id, - balance_amount, - balance_type, - currency, - ref_date as reference_date - FROM historical_points - ORDER BY account_id, ref_date - """ - - # Build parameters and account filter - params = [cutoff_date, today_date] - if account_id: - account_filter = "AND b1.account_id = ?" - params.append(account_id) - else: - account_filter = "" - - # Format the query with conditional filter - formatted_query = query.format(account_filter=account_filter) - - cursor.execute(formatted_query, params) - rows = cursor.fetchall() - - conn.close() - return [dict(row) for row in rows] - - except Exception as e: - conn.close() - raise e - async def get_monthly_transaction_stats_from_db( self, account_id: Optional[str] = None, @@ -1347,10 +1267,9 @@ class DatabaseService: return [] try: - monthly_stats = self._get_monthly_transaction_stats( - account_id=account_id, - date_from=date_from, - date_to=date_to, + db_path = path_manager.get_database_path() + monthly_stats = self.analytics_processor.calculate_monthly_stats( + db_path, account_id=account_id, date_from=date_from, date_to=date_to ) logger.debug( f"Retrieved {len(monthly_stats)} monthly stat points from database" @@ -1360,79 +1279,6 @@ class DatabaseService: logger.error(f"Failed to get monthly transaction stats from database: {e}") return [] - def _get_monthly_transaction_stats( - self, - account_id: Optional[str] = None, - date_from: Optional[str] = None, - date_to: Optional[str] = None, - ) -> List[Dict[str, Any]]: - """Get monthly transaction statistics from SQLite database""" - db_path = path_manager.get_database_path() - if not db_path.exists(): - return [] - - conn = sqlite3.connect(str(db_path)) - conn.row_factory = sqlite3.Row - cursor = conn.cursor() - - try: - # SQL query to aggregate transactions by month - query = """ - SELECT - strftime('%Y-%m', transactionDate) as month, - COALESCE(SUM(CASE WHEN transactionValue > 0 THEN transactionValue ELSE 0 END), 0) as income, - COALESCE(SUM(CASE WHEN transactionValue < 0 THEN ABS(transactionValue) ELSE 0 END), 0) as expenses, - COALESCE(SUM(transactionValue), 0) as net - FROM transactions - WHERE 1=1 - """ - - params = [] - - if account_id: - query += " AND accountId = ?" - params.append(account_id) - - if date_from: - query += " AND transactionDate >= ?" - params.append(date_from) - - if date_to: - query += " AND transactionDate <= ?" - params.append(date_to) - - query += """ - GROUP BY strftime('%Y-%m', transactionDate) - ORDER BY month ASC - """ - - cursor.execute(query, params) - rows = cursor.fetchall() - - # Convert to desired format with proper month display - monthly_stats = [] - for row in rows: - # Convert YYYY-MM to display format like "Mar 2024" - year, month_num = row["month"].split("-") - month_date = datetime.strptime(f"{year}-{month_num}-01", "%Y-%m-%d") - display_month = month_date.strftime("%b %Y") - - monthly_stats.append( - { - "month": display_month, - "income": round(row["income"], 2), - "expenses": round(row["expenses"], 2), - "net": round(row["net"], 2), - } - ) - - conn.close() - return monthly_stats - - except Exception as e: - conn.close() - raise e - async def _check_sync_operations_migration_needed(self) -> bool: """Check if sync_operations table needs to be created""" db_path = path_manager.get_database_path() diff --git a/leggen/services/sync_service.py b/leggen/services/sync_service.py index 56b2910..ddef712 100644 --- a/leggen/services/sync_service.py +++ b/leggen/services/sync_service.py @@ -4,6 +4,11 @@ from typing import List from loguru import logger from leggen.api.models.sync import SyncResult, SyncStatus +from leggen.services.data_processors import ( + AccountEnricher, + BalanceTransformer, + TransactionProcessor, +) from leggen.services.database_service import DatabaseService from leggen.services.gocardless_service import GoCardlessService from leggen.services.notification_service import NotificationService @@ -17,8 +22,13 @@ class SyncService: self.gocardless = GoCardlessService() self.database = DatabaseService() self.notifications = NotificationService() + + # Data processors + self.account_enricher = AccountEnricher() + self.balance_transformer = BalanceTransformer() + self.transaction_processor = TransactionProcessor() + self._sync_status = SyncStatus(is_running=False) - self._institution_logos = {} # Cache for institution logos async def get_sync_status(self) -> SyncStatus: """Get current sync status""" @@ -84,54 +94,25 @@ class SyncService: # Get balances to extract currency information balances = await self.gocardless.get_account_balances(account_id) - # Enrich account details with currency and institution logo + # Enrich and persist account details if account_details and balances: - enriched_account_details = account_details.copy() - - # Extract currency from first balance - balances_list = balances.get("balances", []) - if balances_list: - first_balance = balances_list[0] - balance_amount = first_balance.get("balanceAmount", {}) - currency = balance_amount.get("currency") - if currency: - enriched_account_details["currency"] = currency - - # Get institution details to fetch logo - institution_id = enriched_account_details.get("institution_id") - if institution_id: - try: - institution_details = ( - await self.gocardless.get_institution_details( - institution_id - ) - ) - enriched_account_details["logo"] = ( - institution_details.get("logo", "") - ) - logger.info( - f"Fetched logo for institution {institution_id}: {enriched_account_details.get('logo', 'No logo')}" - ) - except Exception as e: - logger.warning( - f"Failed to fetch institution details for {institution_id}: {e}" - ) + # Enrich account with currency and institution logo + enriched_account_details = ( + await self.account_enricher.enrich_account_details( + account_details, balances + ) + ) # Persist enriched account details to database await self.database.persist_account_details( enriched_account_details ) - # Merge account details into balances data for proper persistence - balances_with_account_info = balances.copy() - balances_with_account_info["institution_id"] = ( - enriched_account_details.get("institution_id") - ) - balances_with_account_info["iban"] = ( - enriched_account_details.get("iban") - ) - balances_with_account_info["account_status"] = ( - enriched_account_details.get("status") + # Merge account metadata into balances for persistence + balances_with_account_info = ( + self.balance_transformer.merge_account_metadata_into_balances( + balances, enriched_account_details + ) ) await self.database.persist_balance( account_id, balances_with_account_info @@ -146,8 +127,10 @@ class SyncService: account_id ) if transactions: - processed_transactions = self.database.process_transactions( - account_id, account_details, transactions + processed_transactions = ( + self.transaction_processor.process_transactions( + account_id, account_details, transactions + ) ) new_transactions = await self.database.persist_transactions( account_id, processed_transactions