refactor(api): Split DatabaseService into repository pattern.

Split the monolithic DatabaseService (1,492 lines) into focused repository
modules using the repository pattern for better maintainability and
separation of concerns.

Changes:
- Create new repositories/ directory with 5 focused repositories:
  - TransactionRepository: transaction data operations (264 lines)
  - AccountRepository: account data operations (128 lines)
  - BalanceRepository: balance data operations (107 lines)
  - MigrationRepository: all database migrations (629 lines)
  - SyncRepository: sync operation tracking (132 lines)
  - BaseRepository: shared database connection logic (28 lines)

- Refactor DatabaseService into a clean facade (287 lines):
  - Delegates data access to repositories
  - Maintains public API (no breaking changes)
  - Keeps data processors in service layer
  - Preserves require_sqlite decorator

- Update tests to mock repository methods instead of private methods
- Fix test references to internal methods (_persist_*, _get_*)

Benefits:
- Clear separation of concerns (one repository per domain)
- Easier maintenance (changes isolated to specific repositories)
- Better testability (repositories can be mocked individually)
- Improved code organization (from 1 file to 7 focused files)

All 114 tests passing.
This commit is contained in:
Elisiário Couto
2025-12-08 23:21:55 +00:00
parent 267db8ac63
commit 5f87991076
10 changed files with 1466 additions and 1360 deletions

View File

@@ -0,0 +1,13 @@
from leggen.repositories.account_repository import AccountRepository
from leggen.repositories.balance_repository import BalanceRepository
from leggen.repositories.migration_repository import MigrationRepository
from leggen.repositories.sync_repository import SyncRepository
from leggen.repositories.transaction_repository import TransactionRepository
__all__ = [
"AccountRepository",
"BalanceRepository",
"MigrationRepository",
"SyncRepository",
"TransactionRepository",
]

View File

@@ -0,0 +1,128 @@
from typing import Any, Dict, List, Optional
from leggen.repositories.base_repository import BaseRepository
class AccountRepository(BaseRepository):
"""Repository for account data operations"""
def create_table(self):
"""Create accounts table with indexes"""
with self._get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"""CREATE TABLE IF NOT EXISTS accounts (
id TEXT PRIMARY KEY,
institution_id TEXT,
status TEXT,
iban TEXT,
name TEXT,
currency TEXT,
created DATETIME,
last_accessed DATETIME,
last_updated DATETIME,
display_name TEXT,
logo TEXT
)"""
)
cursor.execute(
"""CREATE INDEX IF NOT EXISTS idx_accounts_institution_id
ON accounts(institution_id)"""
)
cursor.execute(
"""CREATE INDEX IF NOT EXISTS idx_accounts_status
ON accounts(status)"""
)
conn.commit()
def persist(self, account_data: Dict[str, Any]) -> Dict[str, Any]:
"""Persist account details to database"""
self.create_table()
with self._get_db_connection() as conn:
cursor = conn.cursor()
# Check if account exists and preserve display_name
cursor.execute(
"SELECT display_name FROM accounts WHERE id = ?", (account_data["id"],)
)
existing_row = cursor.fetchone()
existing_display_name = existing_row[0] if existing_row else None
# Use existing display_name if not provided in account_data
display_name = account_data.get("display_name", existing_display_name)
cursor.execute(
"""INSERT OR REPLACE INTO accounts (
id,
institution_id,
status,
iban,
name,
currency,
created,
last_accessed,
last_updated,
display_name,
logo
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
account_data["id"],
account_data["institution_id"],
account_data["status"],
account_data.get("iban"),
account_data.get("name"),
account_data.get("currency"),
account_data["created"],
account_data.get("last_accessed"),
account_data.get("last_updated", account_data["created"]),
display_name,
account_data.get("logo"),
),
)
conn.commit()
return account_data
def get_accounts(
self, account_ids: Optional[List[str]] = None
) -> List[Dict[str, Any]]:
"""Get account details from database"""
if not self._db_exists():
return []
with self._get_db_connection(row_factory=True) as conn:
cursor = conn.cursor()
query = "SELECT * FROM accounts"
params = []
if account_ids:
placeholders = ",".join("?" * len(account_ids))
query += f" WHERE id IN ({placeholders})"
params.extend(account_ids)
query += " ORDER BY created DESC"
cursor.execute(query, params)
rows = cursor.fetchall()
return [dict(row) for row in rows]
def get_account(self, account_id: str) -> Optional[Dict[str, Any]]:
"""Get specific account details from database"""
if not self._db_exists():
return None
with self._get_db_connection(row_factory=True) as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM accounts WHERE id = ?", (account_id,))
row = cursor.fetchone()
if row:
return dict(row)
return None

View File

@@ -0,0 +1,107 @@
import sqlite3
from typing import Any, Dict, List, Optional
from loguru import logger
from leggen.repositories.base_repository import BaseRepository
class BalanceRepository(BaseRepository):
"""Repository for balance data operations"""
def create_table(self):
"""Create balances table with indexes"""
with self._get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"""CREATE TABLE IF NOT EXISTS balances (
id INTEGER PRIMARY KEY AUTOINCREMENT,
account_id TEXT,
bank TEXT,
status TEXT,
iban TEXT,
amount REAL,
currency TEXT,
type TEXT,
timestamp DATETIME
)"""
)
cursor.execute(
"""CREATE INDEX IF NOT EXISTS idx_balances_account_id
ON balances(account_id)"""
)
cursor.execute(
"""CREATE INDEX IF NOT EXISTS idx_balances_timestamp
ON balances(timestamp)"""
)
cursor.execute(
"""CREATE INDEX IF NOT EXISTS idx_balances_account_type_timestamp
ON balances(account_id, type, timestamp)"""
)
conn.commit()
def persist(self, account_id: str, balance_rows: List[tuple]) -> None:
"""Persist balance rows to database"""
try:
self.create_table()
with self._get_db_connection() as conn:
cursor = conn.cursor()
for row in balance_rows:
try:
cursor.execute(
"""INSERT INTO balances (
account_id,
bank,
status,
iban,
amount,
currency,
type,
timestamp
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
row,
)
except sqlite3.IntegrityError:
logger.warning(f"Skipped duplicate balance for {account_id}")
conn.commit()
logger.info(f"Persisted balances for account {account_id}")
except Exception as e:
logger.error(f"Failed to persist balances: {e}")
raise
def get_balances(self, account_id: Optional[str] = None) -> List[Dict[str, Any]]:
"""Get latest balances from database"""
if not self._db_exists():
return []
with self._get_db_connection(row_factory=True) as conn:
cursor = conn.cursor()
# Get latest balance for each account_id and type combination
query = """
SELECT * FROM balances b1
WHERE b1.timestamp = (
SELECT MAX(b2.timestamp)
FROM balances b2
WHERE b2.account_id = b1.account_id AND b2.type = b1.type
)
"""
params = []
if account_id:
query += " AND b1.account_id = ?"
params.append(account_id)
query += " ORDER BY b1.account_id, b1.type"
cursor.execute(query, params)
rows = cursor.fetchall()
return [dict(row) for row in rows]

View File

@@ -0,0 +1,28 @@
import sqlite3
from contextlib import contextmanager
from leggen.utils.paths import path_manager
class BaseRepository:
"""Base repository with shared database connection logic"""
@contextmanager
def _get_db_connection(self, row_factory: bool = False):
"""Context manager for database connections with proper cleanup"""
db_path = path_manager.get_database_path()
conn = sqlite3.connect(str(db_path))
if row_factory:
conn.row_factory = sqlite3.Row
try:
yield conn
except Exception as e:
conn.rollback()
raise e
finally:
conn.close()
def _db_exists(self) -> bool:
"""Check if database file exists"""
db_path = path_manager.get_database_path()
return db_path.exists()

View File

@@ -0,0 +1,626 @@
import sqlite3
import uuid
from datetime import datetime
from loguru import logger
from leggen.repositories.base_repository import BaseRepository
from leggen.utils.paths import path_manager
class MigrationRepository(BaseRepository):
"""Repository for database migrations"""
async def run_all_migrations(self):
"""Run all necessary database migrations"""
await self.migrate_balance_timestamps_if_needed()
await self.migrate_null_transaction_ids_if_needed()
await self.migrate_to_composite_key_if_needed()
await self.migrate_add_display_name_if_needed()
await self.migrate_add_sync_operations_if_needed()
await self.migrate_add_logo_if_needed()
# Balance timestamp migration methods
async def migrate_balance_timestamps_if_needed(self):
"""Check and migrate balance timestamps if needed"""
try:
if await self._check_balance_timestamp_migration_needed():
logger.info("Balance timestamp migration needed, starting...")
await self._migrate_balance_timestamps()
logger.info("Balance timestamp migration completed")
else:
logger.info("Balance timestamps are already consistent")
except Exception as e:
logger.error(f"Balance timestamp migration failed: {e}")
raise
async def _check_balance_timestamp_migration_needed(self) -> bool:
"""Check if balance timestamps need migration"""
db_path = path_manager.get_database_path()
if not db_path.exists():
return False
try:
conn = sqlite3.connect(str(db_path))
cursor = conn.cursor()
cursor.execute("""
SELECT typeof(timestamp) as type, COUNT(*) as count
FROM balances
GROUP BY typeof(timestamp)
""")
types = cursor.fetchall()
conn.close()
type_names = [row[0] for row in types]
return "real" in type_names and "text" in type_names
except Exception as e:
logger.error(f"Failed to check migration status: {e}")
return False
async def _migrate_balance_timestamps(self):
"""Convert all Unix timestamps to datetime strings"""
db_path = path_manager.get_database_path()
if not db_path.exists():
logger.warning("Database file not found, skipping migration")
return
try:
conn = sqlite3.connect(str(db_path))
cursor = conn.cursor()
cursor.execute("""
SELECT id, timestamp
FROM balances
WHERE typeof(timestamp) = 'real'
ORDER BY id
""")
unix_records = cursor.fetchall()
total_records = len(unix_records)
if total_records == 0:
logger.info("No Unix timestamps found to migrate")
conn.close()
return
logger.info(
f"Migrating {total_records} balance records from Unix to datetime format"
)
batch_size = 100
migrated_count = 0
for i in range(0, total_records, batch_size):
batch = unix_records[i : i + batch_size]
for record_id, unix_timestamp in batch:
try:
dt_string = self._unix_to_datetime_string(float(unix_timestamp))
cursor.execute(
"""
UPDATE balances
SET timestamp = ?
WHERE id = ?
""",
(dt_string, record_id),
)
migrated_count += 1
if migrated_count % 100 == 0:
logger.info(
f"Migrated {migrated_count}/{total_records} balance records"
)
except Exception as e:
logger.error(f"Failed to migrate record {record_id}: {e}")
continue
conn.commit()
conn.close()
logger.info(f"Successfully migrated {migrated_count} balance records")
except Exception as e:
logger.error(f"Balance timestamp migration failed: {e}")
raise
def _unix_to_datetime_string(self, unix_timestamp: float) -> str:
"""Convert Unix timestamp to datetime string"""
dt = datetime.fromtimestamp(unix_timestamp)
return dt.isoformat()
# Null transaction IDs migration methods
async def migrate_null_transaction_ids_if_needed(self):
"""Check and migrate null transaction IDs if needed"""
try:
if await self._check_null_transaction_ids_migration_needed():
logger.info("Null transaction IDs migration needed, starting...")
await self._migrate_null_transaction_ids()
logger.info("Null transaction IDs migration completed")
else:
logger.info("No null transaction IDs found to migrate")
except Exception as e:
logger.error(f"Null transaction IDs migration failed: {e}")
raise
async def _check_null_transaction_ids_migration_needed(self) -> bool:
"""Check if null transaction IDs need migration"""
db_path = path_manager.get_database_path()
if not db_path.exists():
return False
try:
conn = sqlite3.connect(str(db_path))
cursor = conn.cursor()
cursor.execute("""
SELECT COUNT(*)
FROM transactions
WHERE (internalTransactionId IS NULL OR internalTransactionId = '')
AND json_extract(rawTransaction, '$.transactionId') IS NOT NULL
""")
count = cursor.fetchone()[0]
conn.close()
return count > 0
except Exception as e:
logger.error(f"Failed to check null transaction IDs migration status: {e}")
return False
async def _migrate_null_transaction_ids(self):
"""Populate null internalTransactionId fields using transactionId from raw data"""
db_path = path_manager.get_database_path()
if not db_path.exists():
logger.warning("Database file not found, skipping migration")
return
try:
conn = sqlite3.connect(str(db_path))
cursor = conn.cursor()
cursor.execute("""
SELECT rowid, json_extract(rawTransaction, '$.transactionId') as transactionId
FROM transactions
WHERE (internalTransactionId IS NULL OR internalTransactionId = '')
AND json_extract(rawTransaction, '$.transactionId') IS NOT NULL
ORDER BY rowid
""")
null_records = cursor.fetchall()
total_records = len(null_records)
if total_records == 0:
logger.info("No null transaction IDs found to migrate")
conn.close()
return
logger.info(
f"Migrating {total_records} transaction records with null internalTransactionId"
)
batch_size = 100
migrated_count = 0
for i in range(0, total_records, batch_size):
batch = null_records[i : i + batch_size]
for rowid, transaction_id in batch:
try:
cursor.execute(
"SELECT COUNT(*) FROM transactions WHERE internalTransactionId = ?",
(str(transaction_id),),
)
existing_count = cursor.fetchone()[0]
if existing_count > 0:
unique_id = f"{str(transaction_id)}_{uuid.uuid4().hex[:8]}"
logger.debug(
f"Generated unique ID for duplicate transactionId: {unique_id}"
)
else:
unique_id = str(transaction_id)
cursor.execute(
"""
UPDATE transactions
SET internalTransactionId = ?
WHERE rowid = ?
""",
(unique_id, rowid),
)
migrated_count += 1
if migrated_count % 100 == 0:
logger.info(
f"Migrated {migrated_count}/{total_records} transaction records"
)
except Exception as e:
logger.error(f"Failed to migrate record {rowid}: {e}")
continue
conn.commit()
conn.close()
logger.info(f"Successfully migrated {migrated_count} transaction records")
except Exception as e:
logger.error(f"Null transaction IDs migration failed: {e}")
raise
# Composite key migration methods
async def migrate_to_composite_key_if_needed(self):
"""Check and migrate to composite primary key if needed"""
try:
if await self._check_composite_key_migration_needed():
logger.info("Composite key migration needed, starting...")
await self._migrate_to_composite_key()
logger.info("Composite key migration completed")
else:
logger.info("Composite key migration not needed")
except Exception as e:
logger.error(f"Composite key migration failed: {e}")
raise
async def _check_composite_key_migration_needed(self) -> bool:
"""Check if composite key migration is needed"""
db_path = path_manager.get_database_path()
if not db_path.exists():
return False
try:
conn = sqlite3.connect(str(db_path))
cursor = conn.cursor()
cursor.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='transactions'"
)
if not cursor.fetchone():
conn.close()
return False
cursor.execute("PRAGMA table_info(transactions)")
columns = cursor.fetchall()
internal_transaction_id_is_pk = any(
col[1] == "internalTransactionId" and col[5] == 1 for col in columns
)
has_composite_key = any(
col[1] in ["accountId", "transactionId"] and col[5] == 1
for col in columns
)
conn.close()
return internal_transaction_id_is_pk or not has_composite_key
except Exception as e:
logger.error(f"Failed to check composite key migration status: {e}")
return False
async def _migrate_to_composite_key(self):
"""Migrate transactions table to use composite primary key (accountId, transactionId)"""
db_path = path_manager.get_database_path()
if not db_path.exists():
logger.warning("Database file not found, skipping migration")
return
try:
conn = sqlite3.connect(str(db_path))
cursor = conn.cursor()
logger.info("Starting composite key migration...")
logger.info("Creating temporary table with composite primary key...")
cursor.execute("DROP TABLE IF EXISTS transactions_temp")
cursor.execute("""
CREATE TABLE transactions_temp (
accountId TEXT NOT NULL,
transactionId TEXT NOT NULL,
internalTransactionId TEXT,
institutionId TEXT,
iban TEXT,
transactionDate DATETIME,
description TEXT,
transactionValue REAL,
transactionCurrency TEXT,
transactionStatus TEXT,
rawTransaction JSON,
PRIMARY KEY (accountId, transactionId)
)
""")
logger.info("Inserting deduplicated data...")
cursor.execute("""
INSERT INTO transactions_temp
SELECT
accountId,
json_extract(rawTransaction, '$.transactionId') as transactionId,
internalTransactionId,
institutionId,
iban,
transactionDate,
description,
transactionValue,
transactionCurrency,
transactionStatus,
rawTransaction
FROM (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY accountId, json_extract(rawTransaction, '$.transactionId')
ORDER BY transactionDate DESC
) as rn
FROM transactions
WHERE json_extract(rawTransaction, '$.transactionId') IS NOT NULL
)
WHERE rn = 1
""")
rows_migrated = cursor.rowcount
logger.info(f"Migrated {rows_migrated} unique transactions")
logger.info("Replacing old table...")
cursor.execute("DROP TABLE transactions")
cursor.execute("ALTER TABLE transactions_temp RENAME TO transactions")
logger.info("Recreating indexes...")
cursor.execute(
"""CREATE INDEX IF NOT EXISTS idx_transactions_internal_id
ON transactions(internalTransactionId)"""
)
cursor.execute(
"""CREATE INDEX IF NOT EXISTS idx_transactions_date
ON transactions(transactionDate)"""
)
cursor.execute(
"""CREATE INDEX IF NOT EXISTS idx_transactions_account_date
ON transactions(accountId, transactionDate)"""
)
cursor.execute(
"""CREATE INDEX IF NOT EXISTS idx_transactions_amount
ON transactions(transactionValue)"""
)
conn.commit()
conn.close()
logger.info("Composite key migration completed successfully")
except Exception as e:
logger.error(f"Composite key migration failed: {e}")
raise
# Display name migration methods
async def migrate_add_display_name_if_needed(self):
"""Check and add display_name column if needed"""
try:
if await self._check_display_name_migration_needed():
logger.info("Display name column migration needed, starting...")
await self._migrate_add_display_name()
logger.info("Display name column migration completed")
else:
logger.info("Display name column already exists")
except Exception as e:
logger.error(f"Display name column migration failed: {e}")
raise
async def _check_display_name_migration_needed(self) -> bool:
"""Check if display_name column needs to be added"""
db_path = path_manager.get_database_path()
if not db_path.exists():
return False
try:
conn = sqlite3.connect(str(db_path))
cursor = conn.cursor()
cursor.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='accounts'"
)
if not cursor.fetchone():
conn.close()
return False
cursor.execute("PRAGMA table_info(accounts)")
columns = cursor.fetchall()
has_display_name = any(col[1] == "display_name" for col in columns)
conn.close()
return not has_display_name
except Exception as e:
logger.error(f"Failed to check display_name migration status: {e}")
return False
async def _migrate_add_display_name(self):
"""Add display_name column to accounts table"""
db_path = path_manager.get_database_path()
if not db_path.exists():
logger.warning("Database file not found, skipping migration")
return
try:
conn = sqlite3.connect(str(db_path))
cursor = conn.cursor()
logger.info("Adding display_name column to accounts table...")
cursor.execute("""
ALTER TABLE accounts
ADD COLUMN display_name TEXT
""")
conn.commit()
conn.close()
logger.info("Display name column migration completed successfully")
except Exception as e:
logger.error(f"Display name column migration failed: {e}")
raise
# Sync operations migration methods
async def migrate_add_sync_operations_if_needed(self):
"""Check and add sync_operations table if needed"""
try:
if await self._check_sync_operations_migration_needed():
logger.info("Sync operations table migration needed, starting...")
await self._migrate_add_sync_operations()
logger.info("Sync operations table migration completed")
else:
logger.info("Sync operations table already exists")
except Exception as e:
logger.error(f"Sync operations table migration failed: {e}")
raise
async def _check_sync_operations_migration_needed(self) -> bool:
"""Check if sync_operations table needs to be created"""
db_path = path_manager.get_database_path()
if not db_path.exists():
return False
try:
conn = sqlite3.connect(str(db_path))
cursor = conn.cursor()
cursor.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='sync_operations'"
)
table_exists = cursor.fetchone() is not None
conn.close()
return not table_exists
except Exception as e:
logger.error(f"Failed to check sync_operations migration status: {e}")
return False
async def _migrate_add_sync_operations(self):
"""Add sync_operations table"""
db_path = path_manager.get_database_path()
if not db_path.exists():
logger.warning("Database file not found, skipping migration")
return
try:
conn = sqlite3.connect(str(db_path))
cursor = conn.cursor()
logger.info("Creating sync_operations table...")
cursor.execute("""
CREATE TABLE sync_operations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
started_at DATETIME NOT NULL,
completed_at DATETIME,
success BOOLEAN,
accounts_processed INTEGER DEFAULT 0,
transactions_added INTEGER DEFAULT 0,
transactions_updated INTEGER DEFAULT 0,
balances_updated INTEGER DEFAULT 0,
duration_seconds REAL,
errors TEXT,
logs TEXT,
trigger_type TEXT DEFAULT 'manual'
)
""")
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_sync_operations_started_at ON sync_operations(started_at)"
)
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_sync_operations_success ON sync_operations(success)"
)
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_sync_operations_trigger_type ON sync_operations(trigger_type)"
)
conn.commit()
conn.close()
logger.info("Sync operations table migration completed successfully")
except Exception as e:
logger.error(f"Sync operations table migration failed: {e}")
raise
# Logo migration methods
async def migrate_add_logo_if_needed(self):
"""Check and add logo column to accounts table if needed"""
try:
if await self._check_logo_migration_needed():
logger.info("Logo column migration needed, starting...")
await self._migrate_add_logo()
logger.info("Logo column migration completed")
else:
logger.info("Logo column already exists")
except Exception as e:
logger.error(f"Logo column migration failed: {e}")
raise
async def _check_logo_migration_needed(self) -> bool:
"""Check if logo column needs to be added to accounts table"""
db_path = path_manager.get_database_path()
if not db_path.exists():
return False
try:
conn = sqlite3.connect(str(db_path))
cursor = conn.cursor()
cursor.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='accounts'"
)
if not cursor.fetchone():
conn.close()
return False
cursor.execute("PRAGMA table_info(accounts)")
columns = cursor.fetchall()
has_logo = any(col[1] == "logo" for col in columns)
conn.close()
return not has_logo
except Exception as e:
logger.error(f"Failed to check logo migration status: {e}")
return False
async def _migrate_add_logo(self):
"""Add logo column to accounts table"""
db_path = path_manager.get_database_path()
if not db_path.exists():
logger.warning("Database file not found, skipping migration")
return
try:
conn = sqlite3.connect(str(db_path))
cursor = conn.cursor()
logger.info("Adding logo column to accounts table...")
cursor.execute("""
ALTER TABLE accounts
ADD COLUMN logo TEXT
""")
conn.commit()
conn.close()
logger.info("Logo column migration completed successfully")
except Exception as e:
logger.error(f"Logo column migration failed: {e}")
raise

View File

@@ -0,0 +1,132 @@
import json
import sqlite3
from typing import Any, Dict, List
from loguru import logger
from leggen.repositories.base_repository import BaseRepository
from leggen.utils.paths import path_manager
class SyncRepository(BaseRepository):
"""Repository for sync operation data"""
def create_table(self):
"""Create sync_operations table with indexes"""
with self._get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS sync_operations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
started_at DATETIME NOT NULL,
completed_at DATETIME,
success BOOLEAN,
accounts_processed INTEGER DEFAULT 0,
transactions_added INTEGER DEFAULT 0,
transactions_updated INTEGER DEFAULT 0,
balances_updated INTEGER DEFAULT 0,
duration_seconds REAL,
errors TEXT,
logs TEXT,
trigger_type TEXT DEFAULT 'manual'
)
""")
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_sync_operations_started_at ON sync_operations(started_at)"
)
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_sync_operations_success ON sync_operations(success)"
)
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_sync_operations_trigger_type ON sync_operations(trigger_type)"
)
conn.commit()
def persist(self, sync_operation: Dict[str, Any]) -> int:
"""Persist sync operation to database and return the ID"""
try:
self.create_table()
db_path = path_manager.get_database_path()
conn = sqlite3.connect(str(db_path))
cursor = conn.cursor()
cursor.execute(
"""INSERT INTO sync_operations (
started_at, completed_at, success, accounts_processed,
transactions_added, transactions_updated, balances_updated,
duration_seconds, errors, logs, trigger_type
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
sync_operation.get("started_at"),
sync_operation.get("completed_at"),
sync_operation.get("success"),
sync_operation.get("accounts_processed", 0),
sync_operation.get("transactions_added", 0),
sync_operation.get("transactions_updated", 0),
sync_operation.get("balances_updated", 0),
sync_operation.get("duration_seconds"),
json.dumps(sync_operation.get("errors", [])),
json.dumps(sync_operation.get("logs", [])),
sync_operation.get("trigger_type", "manual"),
),
)
operation_id = cursor.lastrowid
if operation_id is None:
raise ValueError("Failed to get operation ID after insert")
conn.commit()
conn.close()
logger.debug(f"Persisted sync operation with ID: {operation_id}")
return operation_id
except Exception as e:
logger.error(f"Failed to persist sync operation: {e}")
raise
def get_operations(self, limit: int = 50, offset: int = 0) -> List[Dict[str, Any]]:
"""Get sync operations from database"""
try:
db_path = path_manager.get_database_path()
conn = sqlite3.connect(str(db_path))
cursor = conn.cursor()
cursor.execute(
"""SELECT id, started_at, completed_at, success, accounts_processed,
transactions_added, transactions_updated, balances_updated,
duration_seconds, errors, logs, trigger_type
FROM sync_operations
ORDER BY started_at DESC
LIMIT ? OFFSET ?""",
(limit, offset),
)
operations = []
for row in cursor.fetchall():
operation = {
"id": row[0],
"started_at": row[1],
"completed_at": row[2],
"success": bool(row[3]) if row[3] is not None else None,
"accounts_processed": row[4],
"transactions_added": row[5],
"transactions_updated": row[6],
"balances_updated": row[7],
"duration_seconds": row[8],
"errors": json.loads(row[9]) if row[9] else [],
"logs": json.loads(row[10]) if row[10] else [],
"trigger_type": row[11],
}
operations.append(operation)
conn.close()
return operations
except Exception as e:
logger.error(f"Failed to get sync operations: {e}")
return []

View File

@@ -0,0 +1,264 @@
import json
import sqlite3
from typing import Any, Dict, List, Optional, Union
from loguru import logger
from leggen.repositories.base_repository import BaseRepository
class TransactionRepository(BaseRepository):
"""Repository for transaction data operations"""
def create_table(self):
"""Create transactions table with indexes"""
with self._get_db_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"""CREATE TABLE IF NOT EXISTS transactions (
accountId TEXT NOT NULL,
transactionId TEXT NOT NULL,
internalTransactionId TEXT,
institutionId TEXT,
iban TEXT,
transactionDate DATETIME,
description TEXT,
transactionValue REAL,
transactionCurrency TEXT,
transactionStatus TEXT,
rawTransaction JSON,
PRIMARY KEY (accountId, transactionId)
)"""
)
# Create indexes for better performance
cursor.execute(
"""CREATE INDEX IF NOT EXISTS idx_transactions_internal_id
ON transactions(internalTransactionId)"""
)
cursor.execute(
"""CREATE INDEX IF NOT EXISTS idx_transactions_date
ON transactions(transactionDate)"""
)
cursor.execute(
"""CREATE INDEX IF NOT EXISTS idx_transactions_account_date
ON transactions(accountId, transactionDate)"""
)
cursor.execute(
"""CREATE INDEX IF NOT EXISTS idx_transactions_amount
ON transactions(transactionValue)"""
)
conn.commit()
def persist(
self, account_id: str, transactions: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
"""Persist transactions to database, return new ones"""
try:
self.create_table()
with self._get_db_connection() as conn:
cursor = conn.cursor()
insert_sql = """INSERT OR REPLACE INTO transactions (
accountId,
transactionId,
internalTransactionId,
institutionId,
iban,
transactionDate,
description,
transactionValue,
transactionCurrency,
transactionStatus,
rawTransaction
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"""
new_transactions = []
for transaction in transactions:
try:
# Check if transaction already exists
cursor.execute(
"""SELECT COUNT(*) FROM transactions
WHERE accountId = ? AND transactionId = ?""",
(transaction["accountId"], transaction["transactionId"]),
)
exists = cursor.fetchone()[0] > 0
cursor.execute(
insert_sql,
(
transaction["accountId"],
transaction["transactionId"],
transaction.get("internalTransactionId"),
transaction["institutionId"],
transaction["iban"],
transaction["transactionDate"],
transaction["description"],
transaction["transactionValue"],
transaction["transactionCurrency"],
transaction["transactionStatus"],
json.dumps(transaction["rawTransaction"]),
),
)
if not exists:
new_transactions.append(transaction)
except sqlite3.IntegrityError as e:
logger.warning(
f"Failed to insert transaction {transaction.get('transactionId')}: {e}"
)
continue
conn.commit()
logger.info(
f"Persisted {len(new_transactions)} new transactions for account {account_id}"
)
return new_transactions
except Exception as e:
logger.error(f"Failed to persist transactions: {e}")
raise
def get_transactions(
self,
account_id: Optional[str] = None,
limit: Optional[int] = 100,
offset: int = 0,
date_from: Optional[str] = None,
date_to: Optional[str] = None,
min_amount: Optional[float] = None,
max_amount: Optional[float] = None,
search: Optional[str] = None,
) -> List[Dict[str, Any]]:
"""Get transactions with optional filtering"""
if not self._db_exists():
return []
with self._get_db_connection(row_factory=True) as conn:
cursor = conn.cursor()
query = "SELECT * FROM transactions WHERE 1=1"
params: List[Union[str, int, float]] = []
if account_id:
query += " AND accountId = ?"
params.append(account_id)
if date_from:
query += " AND transactionDate >= ?"
params.append(date_from)
if date_to:
query += " AND transactionDate <= ?"
params.append(date_to)
if min_amount is not None:
query += " AND transactionValue >= ?"
params.append(min_amount)
if max_amount is not None:
query += " AND transactionValue <= ?"
params.append(max_amount)
if search:
query += " AND description LIKE ?"
params.append(f"%{search}%")
query += " ORDER BY transactionDate DESC"
if limit:
query += " LIMIT ?"
params.append(limit)
if offset:
query += " OFFSET ?"
params.append(offset)
cursor.execute(query, params)
rows = cursor.fetchall()
transactions = []
for row in rows:
transaction = dict(row)
if transaction["rawTransaction"]:
transaction["rawTransaction"] = json.loads(
transaction["rawTransaction"]
)
transactions.append(transaction)
return transactions
def get_count(
self,
account_id: Optional[str] = None,
date_from: Optional[str] = None,
date_to: Optional[str] = None,
min_amount: Optional[float] = None,
max_amount: Optional[float] = None,
search: Optional[str] = None,
) -> int:
"""Get total count of transactions matching filters"""
if not self._db_exists():
return 0
with self._get_db_connection() as conn:
cursor = conn.cursor()
query = "SELECT COUNT(*) FROM transactions WHERE 1=1"
params: List[Union[str, float]] = []
if account_id:
query += " AND accountId = ?"
params.append(account_id)
if date_from:
query += " AND transactionDate >= ?"
params.append(date_from)
if date_to:
query += " AND transactionDate <= ?"
params.append(date_to)
if min_amount is not None:
query += " AND transactionValue >= ?"
params.append(min_amount)
if max_amount is not None:
query += " AND transactionValue <= ?"
params.append(max_amount)
if search:
query += " AND description LIKE ?"
params.append(f"%{search}%")
cursor.execute(query, params)
return cursor.fetchone()[0]
def get_account_summary(self, account_id: str) -> Optional[Dict[str, Any]]:
"""Get basic account info from transactions table"""
if not self._db_exists():
return None
with self._get_db_connection(row_factory=True) as conn:
cursor = conn.cursor()
cursor.execute(
"""
SELECT DISTINCT accountId, institutionId, iban
FROM transactions
WHERE accountId = ?
ORDER BY transactionDate DESC
LIMIT 1
""",
(account_id,),
)
row = cursor.fetchone()
if row:
return dict(row)
return None

File diff suppressed because it is too large Load Diff

View File

@@ -120,12 +120,10 @@ class TestConfigurablePaths:
"iban": "TEST_IBAN",
}
# Use the internal balance persistence method since the test needs direct database access
# Use the public balance persistence method
import asyncio
asyncio.run(
database_service._persist_balance_sqlite("test-account", balance_data)
)
asyncio.run(database_service.persist_balance("test-account", balance_data))
# Retrieve balances
balances = asyncio.run(

View File

@@ -85,7 +85,7 @@ class TestDatabaseService:
):
"""Test successful retrieval of transactions from database."""
with patch.object(
database_service, "_get_transactions"
database_service.transactions, "get_transactions"
) as mock_get_transactions:
mock_get_transactions.return_value = sample_transactions_db_format
@@ -111,7 +111,7 @@ class TestDatabaseService:
):
"""Test retrieving transactions with filters."""
with patch.object(
database_service, "_get_transactions"
database_service.transactions, "get_transactions"
) as mock_get_transactions:
mock_get_transactions.return_value = sample_transactions_db_format
@@ -149,7 +149,7 @@ class TestDatabaseService:
async def test_get_transactions_from_db_error(self, database_service):
"""Test handling error when getting transactions."""
with patch.object(
database_service, "_get_transactions"
database_service.transactions, "get_transactions"
) as mock_get_transactions:
mock_get_transactions.side_effect = Exception("Database error")
@@ -159,7 +159,7 @@ class TestDatabaseService:
async def test_get_transaction_count_from_db_success(self, database_service):
"""Test successful retrieval of transaction count."""
with patch.object(database_service, "_get_transaction_count") as mock_get_count:
with patch.object(database_service.transactions, "get_count") as mock_get_count:
mock_get_count.return_value = 42
result = await database_service.get_transaction_count_from_db(
@@ -167,11 +167,18 @@ class TestDatabaseService:
)
assert result == 42
mock_get_count.assert_called_once_with(account_id="test-account-123")
mock_get_count.assert_called_once_with(
account_id="test-account-123",
date_from=None,
date_to=None,
min_amount=None,
max_amount=None,
search=None,
)
async def test_get_transaction_count_from_db_with_filters(self, database_service):
"""Test getting transaction count with filters."""
with patch.object(database_service, "_get_transaction_count") as mock_get_count:
with patch.object(database_service.transactions, "get_count") as mock_get_count:
mock_get_count.return_value = 15
result = await database_service.get_transaction_count_from_db(
@@ -185,7 +192,9 @@ class TestDatabaseService:
mock_get_count.assert_called_once_with(
account_id="test-account-123",
date_from="2025-09-01",
date_to=None,
min_amount=-100.0,
max_amount=None,
search="Coffee",
)
@@ -201,7 +210,7 @@ class TestDatabaseService:
async def test_get_transaction_count_from_db_error(self, database_service):
"""Test handling error when getting count."""
with patch.object(database_service, "_get_transaction_count") as mock_get_count:
with patch.object(database_service.transactions, "get_count") as mock_get_count:
mock_get_count.side_effect = Exception("Database error")
result = await database_service.get_transaction_count_from_db()
@@ -212,7 +221,9 @@ class TestDatabaseService:
self, database_service, sample_balances_db_format
):
"""Test successful retrieval of balances from database."""
with patch.object(database_service, "_get_balances") as mock_get_balances:
with patch.object(
database_service.balances, "get_balances"
) as mock_get_balances:
mock_get_balances.return_value = sample_balances_db_format
result = await database_service.get_balances_from_db(
@@ -234,7 +245,9 @@ class TestDatabaseService:
async def test_get_balances_from_db_error(self, database_service):
"""Test handling error when getting balances."""
with patch.object(database_service, "_get_balances") as mock_get_balances:
with patch.object(
database_service.balances, "get_balances"
) as mock_get_balances:
mock_get_balances.side_effect = Exception("Database error")
result = await database_service.get_balances_from_db()
@@ -249,7 +262,9 @@ class TestDatabaseService:
"iban": "LT313250081177977789",
}
with patch.object(database_service, "_get_account_summary") as mock_get_summary:
with patch.object(
database_service.transactions, "get_account_summary"
) as mock_get_summary:
mock_get_summary.return_value = mock_summary
result = await database_service.get_account_summary_from_db(
@@ -269,7 +284,9 @@ class TestDatabaseService:
async def test_get_account_summary_from_db_error(self, database_service):
"""Test handling error when getting summary."""
with patch.object(database_service, "_get_account_summary") as mock_get_summary:
with patch.object(
database_service.transactions, "get_account_summary"
) as mock_get_summary:
mock_get_summary.side_effect = Exception("Database error")
result = await database_service.get_account_summary_from_db(
@@ -291,87 +308,87 @@ class TestDatabaseService:
],
}
with patch("sqlite3.connect") as mock_connect:
mock_conn = mock_connect.return_value
mock_cursor = mock_conn.cursor.return_value
with (
patch.object(database_service.balances, "persist") as mock_persist,
patch.object(
database_service.balance_transformer, "transform_to_database_format"
) as mock_transform,
):
mock_transform.return_value = [
(
"test-account-123",
"REVOLUT_REVOLT21",
"active",
"LT313250081177977789",
1000.0,
"EUR",
"interimAvailable",
"2025-09-01T10:00:00",
)
]
await database_service._persist_balance_sqlite(
"test-account-123", balance_data
)
await database_service.persist_balance("test-account-123", balance_data)
# Verify database operations
mock_connect.assert_called()
mock_cursor.execute.assert_called() # Table creation and insert
mock_conn.commit.assert_called_once()
mock_conn.close.assert_called_once()
# Verify transformation and persistence were called
mock_transform.assert_called_once_with("test-account-123", balance_data)
mock_persist.assert_called_once()
async def test_persist_balance_sqlite_error(self, database_service):
"""Test handling error during balance persistence."""
balance_data = {"balances": []}
with patch("sqlite3.connect") as mock_connect:
mock_connect.side_effect = Exception("Database error")
with (
patch.object(database_service.balances, "persist") as mock_persist,
patch.object(
database_service.balance_transformer, "transform_to_database_format"
) as mock_transform,
):
mock_persist.side_effect = Exception("Database error")
mock_transform.return_value = []
with pytest.raises(Exception, match="Database error"):
await database_service._persist_balance_sqlite(
"test-account-123", balance_data
)
await database_service.persist_balance("test-account-123", balance_data)
async def test_persist_transactions_sqlite_success(
self, database_service, sample_transactions_db_format
):
"""Test successful transaction persistence."""
with patch("sqlite3.connect") as mock_connect:
mock_conn = mock_connect.return_value
mock_cursor = mock_conn.cursor.return_value
# Mock fetchone to return (0,) indicating transaction doesn't exist yet
mock_cursor.fetchone.return_value = (0,)
with patch.object(database_service.transactions, "persist") as mock_persist:
mock_persist.return_value = sample_transactions_db_format
result = await database_service._persist_transactions_sqlite(
result = await database_service.persist_transactions(
"test-account-123", sample_transactions_db_format
)
# Should return the transactions (assuming no duplicates)
assert len(result) >= 0 # Could be empty if all are duplicates
# Verify database operations
mock_connect.assert_called()
mock_cursor.execute.assert_called()
mock_conn.commit.assert_called_once()
mock_conn.close.assert_called_once()
# Should return the new transactions
assert len(result) == 2
mock_persist.assert_called_once_with(
"test-account-123", sample_transactions_db_format
)
async def test_persist_transactions_sqlite_duplicate_detection(
self, database_service, sample_transactions_db_format
):
"""Test that existing transactions are not returned as new."""
with patch("sqlite3.connect") as mock_connect:
mock_conn = mock_connect.return_value
mock_cursor = mock_conn.cursor.return_value
# Mock fetchone to return (1,) indicating transaction already exists
mock_cursor.fetchone.return_value = (1,)
with patch.object(database_service.transactions, "persist") as mock_persist:
# Return empty list indicating all were duplicates
mock_persist.return_value = []
result = await database_service._persist_transactions_sqlite(
result = await database_service.persist_transactions(
"test-account-123", sample_transactions_db_format
)
# Should return empty list since all transactions already exist
assert len(result) == 0
# Verify database operations still happened (INSERT OR REPLACE executed)
mock_connect.assert_called()
mock_cursor.execute.assert_called()
mock_conn.commit.assert_called_once()
mock_conn.close.assert_called_once()
mock_persist.assert_called_once()
async def test_persist_transactions_sqlite_error(self, database_service):
"""Test handling error during transaction persistence."""
with patch("sqlite3.connect") as mock_connect:
mock_connect.side_effect = Exception("Database error")
with patch.object(database_service.transactions, "persist") as mock_persist:
mock_persist.side_effect = Exception("Database error")
with pytest.raises(Exception, match="Database error"):
await database_service._persist_transactions_sqlite(
"test-account-123", []
)
await database_service.persist_transactions("test-account-123", [])
async def test_process_transactions_booked_and_pending(self, database_service):
"""Test processing transactions with both booked and pending."""