diff --git a/leggen/commands/bank/add.py b/leggen/commands/bank/add.py index 19d0961..14fd9b3 100644 --- a/leggen/commands/bank/add.py +++ b/leggen/commands/bank/add.py @@ -2,7 +2,6 @@ import click from leggen.api_client import LeggenAPIClient from leggen.main import cli -from leggen.utils.disk import save_file from leggen.utils.text import info, print_table, success, warning @@ -63,9 +62,6 @@ def add(ctx): # Connect to bank via API result = api_client.connect_to_bank(bank_id, "http://localhost:8000/") - # Save requisition details - save_file(f"req_{result['id']}.json", result) - success("Bank connection request created successfully!") warning( "Please open the following URL in your browser to complete the authorization:" diff --git a/leggen/services/database_helpers.py b/leggen/services/database_helpers.py new file mode 100644 index 0000000..e15754f --- /dev/null +++ b/leggen/services/database_helpers.py @@ -0,0 +1,64 @@ +"""Database helper utilities for Leggen.""" + +import sqlite3 +from contextlib import contextmanager +from pathlib import Path +from typing import Any, Generator + +from loguru import logger + + +@contextmanager +def get_db_connection(db_path: Path) -> Generator[sqlite3.Connection, None, None]: + """Context manager for database connections. + + Usage: + with get_db_connection(db_path) as conn: + cursor = conn.cursor() + cursor.execute(...) + conn.commit() + """ + conn = None + try: + conn = sqlite3.connect(str(db_path)) + conn.row_factory = sqlite3.Row # Enable dict-like access + yield conn + except Exception as e: + if conn: + conn.rollback() + logger.error(f"Database error: {e}") + raise + finally: + if conn: + conn.close() + + +def execute_query( + db_path: Path, query: str, params: tuple = () +) -> list[dict[str, Any]]: + """Execute a SELECT query and return results as list of dicts.""" + with get_db_connection(db_path) as conn: + cursor = conn.cursor() + cursor.execute(query, params) + rows = cursor.fetchall() + return [dict(row) for row in rows] + + +def execute_single( + db_path: Path, query: str, params: tuple = () +) -> dict[str, Any] | None: + """Execute a SELECT query and return a single result as dict or None.""" + with get_db_connection(db_path) as conn: + cursor = conn.cursor() + cursor.execute(query, params) + row = cursor.fetchone() + return dict(row) if row else None + + +def execute_count(db_path: Path, query: str, params: tuple = ()) -> int: + """Execute a COUNT query and return the integer result.""" + with get_db_connection(db_path) as conn: + cursor = conn.cursor() + cursor.execute(query, params) + result = cursor.fetchone() + return result[0] if result else 0 diff --git a/leggen/services/database_migrations.py b/leggen/services/database_migrations.py new file mode 100644 index 0000000..689230c --- /dev/null +++ b/leggen/services/database_migrations.py @@ -0,0 +1,657 @@ +"""Database migration functions for Leggen.""" + +import sqlite3 +import uuid +from datetime import datetime +from pathlib import Path + +from loguru import logger + + +def run_all_migrations(db_path: Path) -> None: + """Run all necessary database migrations.""" + if not db_path.exists(): + logger.info("Database file not found, skipping migrations") + return + + migrate_balance_timestamps_if_needed(db_path) + migrate_null_transaction_ids_if_needed(db_path) + migrate_to_composite_key_if_needed(db_path) + migrate_add_display_name_if_needed(db_path) + migrate_add_sync_operations_if_needed(db_path) + migrate_add_logo_if_needed(db_path) + + +def migrate_balance_timestamps_if_needed(db_path: Path) -> None: + """Check and migrate balance timestamps if needed.""" + try: + if _check_balance_timestamp_migration_needed(db_path): + logger.info("Balance timestamp migration needed, starting...") + _migrate_balance_timestamps(db_path) + logger.info("Balance timestamp migration completed") + else: + logger.info("Balance timestamps are already consistent") + except Exception as e: + logger.error(f"Balance timestamp migration failed: {e}") + raise + + +def _check_balance_timestamp_migration_needed(db_path: Path) -> bool: + """Check if balance timestamps need migration.""" + if not db_path.exists(): + return False + + try: + conn = sqlite3.connect(str(db_path)) + cursor = conn.cursor() + + # Check for mixed timestamp types + cursor.execute(""" + SELECT typeof(timestamp) as type, COUNT(*) as count + FROM balances + GROUP BY typeof(timestamp) + """) + + types = cursor.fetchall() + conn.close() + + # If we have both 'real' and 'text' types, migration is needed + type_names = [row[0] for row in types] + return "real" in type_names and "text" in type_names + + except Exception as e: + logger.error(f"Failed to check migration status: {e}") + return False + + +def _migrate_balance_timestamps(db_path: Path) -> None: + """Convert all Unix timestamps to datetime strings.""" + try: + conn = sqlite3.connect(str(db_path)) + cursor = conn.cursor() + + # Get all balances with REAL timestamps + cursor.execute(""" + SELECT id, timestamp + FROM balances + WHERE typeof(timestamp) = 'real' + ORDER BY id + """) + + unix_records = cursor.fetchall() + total_records = len(unix_records) + + if total_records == 0: + logger.info("No Unix timestamps found to migrate") + conn.close() + return + + logger.info( + f"Migrating {total_records} balance records from Unix to datetime format" + ) + + # Convert and update in batches + batch_size = 100 + migrated_count = 0 + + for i in range(0, total_records, batch_size): + batch = unix_records[i : i + batch_size] + + for record_id, unix_timestamp in batch: + try: + # Convert Unix timestamp to datetime string + dt_string = _unix_to_datetime_string(float(unix_timestamp)) + + # Update the record + cursor.execute( + """ + UPDATE balances + SET timestamp = ? + WHERE id = ? + """, + (dt_string, record_id), + ) + + migrated_count += 1 + + if migrated_count % 100 == 0: + logger.info( + f"Migrated {migrated_count}/{total_records} balance records" + ) + + except Exception as e: + logger.error(f"Failed to migrate record {record_id}: {e}") + continue + + # Commit batch + conn.commit() + + conn.close() + logger.info(f"Successfully migrated {migrated_count} balance records") + + except Exception as e: + logger.error(f"Balance timestamp migration failed: {e}") + raise + + +def migrate_null_transaction_ids_if_needed(db_path: Path) -> None: + """Check and migrate null transaction IDs if needed.""" + try: + if _check_null_transaction_ids_migration_needed(db_path): + logger.info("Null transaction IDs migration needed, starting...") + _migrate_null_transaction_ids(db_path) + logger.info("Null transaction IDs migration completed") + else: + logger.info("No null transaction IDs found to migrate") + except Exception as e: + logger.error(f"Null transaction IDs migration failed: {e}") + raise + + +def _check_null_transaction_ids_migration_needed(db_path: Path) -> bool: + """Check if null transaction IDs need migration.""" + if not db_path.exists(): + return False + + try: + conn = sqlite3.connect(str(db_path)) + cursor = conn.cursor() + + # Check for transactions with null or empty internalTransactionId + cursor.execute(""" + SELECT COUNT(*) + FROM transactions + WHERE (internalTransactionId IS NULL OR internalTransactionId = '') + AND json_extract(rawTransaction, '$.transactionId') IS NOT NULL + """) + + count = cursor.fetchone()[0] + conn.close() + + return count > 0 + + except Exception as e: + logger.error(f"Failed to check null transaction IDs migration status: {e}") + return False + + +def _migrate_null_transaction_ids(db_path: Path) -> None: + """Populate null internalTransactionId fields using transactionId from raw data.""" + try: + conn = sqlite3.connect(str(db_path)) + cursor = conn.cursor() + + # Get all transactions with null/empty internalTransactionId but valid transactionId in raw data + cursor.execute(""" + SELECT rowid, json_extract(rawTransaction, '$.transactionId') as transactionId + FROM transactions + WHERE (internalTransactionId IS NULL OR internalTransactionId = '') + AND json_extract(rawTransaction, '$.transactionId') IS NOT NULL + ORDER BY rowid + """) + + null_records = cursor.fetchall() + total_records = len(null_records) + + if total_records == 0: + logger.info("No null transaction IDs found to migrate") + conn.close() + return + + logger.info( + f"Migrating {total_records} transaction records with null internalTransactionId" + ) + + # Update in batches + batch_size = 100 + migrated_count = 0 + skipped_duplicates = 0 + + for i in range(0, total_records, batch_size): + batch = null_records[i : i + batch_size] + + for rowid, transaction_id in batch: + try: + # Check if this transactionId is already used by another record + cursor.execute( + "SELECT COUNT(*) FROM transactions WHERE internalTransactionId = ?", + (str(transaction_id),), + ) + existing_count = cursor.fetchone()[0] + + if existing_count > 0: + # Generate a unique ID to avoid constraint violation + unique_id = f"{str(transaction_id)}_{uuid.uuid4().hex[:8]}" + logger.debug( + f"Generated unique ID for duplicate transactionId: {unique_id}" + ) + else: + # Use the original transactionId + unique_id = str(transaction_id) + + # Update the record + cursor.execute( + """ + UPDATE transactions + SET internalTransactionId = ? + WHERE rowid = ? + """, + (unique_id, rowid), + ) + + migrated_count += 1 + + if migrated_count % 100 == 0: + logger.info( + f"Migrated {migrated_count}/{total_records} transaction records" + ) + + except Exception as e: + logger.error(f"Failed to migrate record {rowid}: {e}") + continue + + # Commit batch + conn.commit() + + conn.close() + logger.info(f"Successfully migrated {migrated_count} transaction records") + if skipped_duplicates > 0: + logger.info( + f"Generated unique IDs for {skipped_duplicates} duplicate transactionIds" + ) + + except Exception as e: + logger.error(f"Null transaction IDs migration failed: {e}") + raise + + +def migrate_to_composite_key_if_needed(db_path: Path) -> None: + """Check and migrate to composite primary key if needed.""" + try: + if _check_composite_key_migration_needed(db_path): + logger.info("Composite key migration needed, starting...") + _migrate_to_composite_key(db_path) + logger.info("Composite key migration completed") + else: + logger.info("Composite key migration not needed") + except Exception as e: + logger.error(f"Composite key migration failed: {e}") + raise + + +def _check_composite_key_migration_needed(db_path: Path) -> bool: + """Check if composite key migration is needed.""" + if not db_path.exists(): + return False + + try: + conn = sqlite3.connect(str(db_path)) + cursor = conn.cursor() + + # Check if transactions table exists + cursor.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='transactions'" + ) + if not cursor.fetchone(): + conn.close() + return False + + # Check if transactions table has the old primary key structure + cursor.execute("PRAGMA table_info(transactions)") + columns = cursor.fetchall() + + # Check if internalTransactionId is the primary key (old structure) + internal_transaction_id_is_pk = any( + col[1] == "internalTransactionId" and col[5] == 1 # col[5] is pk flag + for col in columns + ) + + # Check if we have the new composite primary key structure + has_composite_key = any( + col[1] in ["accountId", "transactionId"] + and col[5] == 1 # col[5] is pk flag + for col in columns + ) + + conn.close() + + # Migration is needed if: + # 1. internalTransactionId is still the primary key (old structure), OR + # 2. We don't have the new composite key structure yet + return internal_transaction_id_is_pk or not has_composite_key + + except Exception as e: + logger.error(f"Failed to check composite key migration status: {e}") + return False + + +def _migrate_to_composite_key(db_path: Path) -> None: + """Migrate transactions table to use composite primary key (accountId, transactionId).""" + try: + conn = sqlite3.connect(str(db_path)) + cursor = conn.cursor() + + logger.info("Starting composite key migration...") + + # Step 1: Create temporary table with new schema + logger.info("Creating temporary table with composite primary key...") + cursor.execute("DROP TABLE IF EXISTS transactions_temp") + cursor.execute(""" + CREATE TABLE transactions_temp ( + accountId TEXT NOT NULL, + transactionId TEXT NOT NULL, + internalTransactionId TEXT, + institutionId TEXT, + iban TEXT, + transactionDate DATETIME, + description TEXT, + transactionValue REAL, + transactionCurrency TEXT, + transactionStatus TEXT, + rawTransaction JSON, + PRIMARY KEY (accountId, transactionId) + ) + """) + + # Step 2: Insert deduplicated data (keep most recent duplicate) + logger.info("Inserting deduplicated data...") + cursor.execute(""" + INSERT INTO transactions_temp + SELECT + accountId, + json_extract(rawTransaction, '$.transactionId') as transactionId, + internalTransactionId, + institutionId, + iban, + transactionDate, + description, + transactionValue, + transactionCurrency, + transactionStatus, + rawTransaction + FROM ( + SELECT *, + ROW_NUMBER() OVER ( + PARTITION BY accountId, json_extract(rawTransaction, '$.transactionId') + ORDER BY transactionDate DESC, rowid DESC + ) as rn + FROM transactions + WHERE json_extract(rawTransaction, '$.transactionId') IS NOT NULL + AND accountId IS NOT NULL + ) WHERE rn = 1 + """) + + # Get counts for reporting + cursor.execute("SELECT COUNT(*) FROM transactions") + old_count = cursor.fetchone()[0] + + cursor.execute("SELECT COUNT(*) FROM transactions_temp") + new_count = cursor.fetchone()[0] + + duplicates_removed = old_count - new_count + logger.info( + f"Migration stats: {old_count} → {new_count} records ({duplicates_removed} duplicates removed)" + ) + + # Step 3: Replace tables + logger.info("Replacing tables...") + cursor.execute("ALTER TABLE transactions RENAME TO transactions_old") + cursor.execute("ALTER TABLE transactions_temp RENAME TO transactions") + + # Step 4: Recreate indexes + logger.info("Recreating indexes...") + cursor.execute( + "CREATE INDEX IF NOT EXISTS idx_transactions_internal_id ON transactions(internalTransactionId)" + ) + cursor.execute( + "CREATE INDEX IF NOT EXISTS idx_transactions_date ON transactions(transactionDate)" + ) + cursor.execute( + "CREATE INDEX IF NOT EXISTS idx_transactions_account_date ON transactions(accountId, transactionDate)" + ) + cursor.execute( + "CREATE INDEX IF NOT EXISTS idx_transactions_amount ON transactions(transactionValue)" + ) + + # Step 5: Cleanup + logger.info("Cleaning up...") + cursor.execute("DROP TABLE transactions_old") + + conn.commit() + conn.close() + + logger.info("Composite key migration completed successfully") + + except Exception as e: + logger.error(f"Composite key migration failed: {e}") + raise + + +def migrate_add_display_name_if_needed(db_path: Path) -> None: + """Check and add display_name column to accounts table if needed.""" + try: + if _check_display_name_migration_needed(db_path): + logger.info("Display name column migration needed, starting...") + _migrate_add_display_name(db_path) + logger.info("Display name column migration completed") + else: + logger.info("Display name column already exists") + except Exception as e: + logger.error(f"Display name column migration failed: {e}") + raise + + +def _check_display_name_migration_needed(db_path: Path) -> bool: + """Check if display_name column needs to be added to accounts table.""" + if not db_path.exists(): + return False + + try: + conn = sqlite3.connect(str(db_path)) + cursor = conn.cursor() + + # Check if accounts table exists + cursor.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='accounts'" + ) + if not cursor.fetchone(): + conn.close() + return False + + # Check if display_name column exists + cursor.execute("PRAGMA table_info(accounts)") + columns = cursor.fetchall() + + # Check if display_name column exists + has_display_name = any(col[1] == "display_name" for col in columns) + + conn.close() + return not has_display_name + + except Exception as e: + logger.error(f"Failed to check display_name migration status: {e}") + return False + + +def _migrate_add_display_name(db_path: Path) -> None: + """Add display_name column to accounts table.""" + try: + conn = sqlite3.connect(str(db_path)) + cursor = conn.cursor() + + logger.info("Adding display_name column to accounts table...") + + # Add the display_name column + cursor.execute(""" + ALTER TABLE accounts + ADD COLUMN display_name TEXT + """) + + conn.commit() + conn.close() + + logger.info("Display name column migration completed successfully") + + except Exception as e: + logger.error(f"Display name column migration failed: {e}") + raise + + +def migrate_add_sync_operations_if_needed(db_path: Path) -> None: + """Check and add sync_operations table if needed.""" + try: + if _check_sync_operations_migration_needed(db_path): + logger.info("Sync operations table migration needed, starting...") + _migrate_add_sync_operations(db_path) + logger.info("Sync operations table migration completed") + else: + logger.info("Sync operations table already exists") + except Exception as e: + logger.error(f"Sync operations table migration failed: {e}") + raise + + +def _check_sync_operations_migration_needed(db_path: Path) -> bool: + """Check if sync_operations table needs to be created.""" + if not db_path.exists(): + return False + + try: + conn = sqlite3.connect(str(db_path)) + cursor = conn.cursor() + + # Check if sync_operations table exists + cursor.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='sync_operations'" + ) + table_exists = cursor.fetchone() is not None + + conn.close() + return not table_exists + + except Exception as e: + logger.error(f"Failed to check sync_operations migration status: {e}") + return False + + +def _migrate_add_sync_operations(db_path: Path) -> None: + """Add sync_operations table.""" + try: + conn = sqlite3.connect(str(db_path)) + cursor = conn.cursor() + + logger.info("Creating sync_operations table...") + + # Create the sync_operations table + cursor.execute(""" + CREATE TABLE sync_operations ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + started_at DATETIME NOT NULL, + completed_at DATETIME, + success BOOLEAN, + accounts_processed INTEGER DEFAULT 0, + transactions_added INTEGER DEFAULT 0, + transactions_updated INTEGER DEFAULT 0, + balances_updated INTEGER DEFAULT 0, + duration_seconds REAL, + errors TEXT, + logs TEXT, + trigger_type TEXT DEFAULT 'manual' + ) + """) + + # Create indexes for better performance + cursor.execute( + "CREATE INDEX IF NOT EXISTS idx_sync_operations_started_at ON sync_operations(started_at)" + ) + cursor.execute( + "CREATE INDEX IF NOT EXISTS idx_sync_operations_success ON sync_operations(success)" + ) + cursor.execute( + "CREATE INDEX IF NOT EXISTS idx_sync_operations_trigger_type ON sync_operations(trigger_type)" + ) + + conn.commit() + conn.close() + + logger.info("Sync operations table migration completed successfully") + + except Exception as e: + logger.error(f"Sync operations table migration failed: {e}") + raise + + +def migrate_add_logo_if_needed(db_path: Path) -> None: + """Check and add logo column to accounts table if needed.""" + try: + if _check_logo_migration_needed(db_path): + logger.info("Logo column migration needed, starting...") + _migrate_add_logo(db_path) + logger.info("Logo column migration completed") + else: + logger.info("Logo column already exists") + except Exception as e: + logger.error(f"Logo column migration failed: {e}") + raise + + +def _check_logo_migration_needed(db_path: Path) -> bool: + """Check if logo column needs to be added to accounts table.""" + if not db_path.exists(): + return False + + try: + conn = sqlite3.connect(str(db_path)) + cursor = conn.cursor() + + # Check if accounts table exists + cursor.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='accounts'" + ) + if not cursor.fetchone(): + conn.close() + return False + + # Check if logo column exists + cursor.execute("PRAGMA table_info(accounts)") + columns = cursor.fetchall() + + # Check if logo column exists + has_logo = any(col[1] == "logo" for col in columns) + + conn.close() + return not has_logo + + except Exception as e: + logger.error(f"Failed to check logo migration status: {e}") + return False + + +def _migrate_add_logo(db_path: Path) -> None: + """Add logo column to accounts table.""" + try: + conn = sqlite3.connect(str(db_path)) + cursor = conn.cursor() + + logger.info("Adding logo column to accounts table...") + + # Add the logo column + cursor.execute(""" + ALTER TABLE accounts + ADD COLUMN logo TEXT + """) + + conn.commit() + conn.close() + + logger.info("Logo column migration completed successfully") + + except Exception as e: + logger.error(f"Logo column migration failed: {e}") + raise + + +def _unix_to_datetime_string(unix_timestamp: float) -> str: + """Convert Unix timestamp to datetime string.""" + dt = datetime.fromtimestamp(unix_timestamp) + return dt.isoformat() diff --git a/leggen/services/database_service.py b/leggen/services/database_service.py index c8f977b..b866897 100644 --- a/leggen/services/database_service.py +++ b/leggen/services/database_service.py @@ -1,10 +1,11 @@ import json -import sqlite3 from datetime import datetime, timedelta from typing import Any, Dict, List, Optional from loguru import logger +from leggen.services.database_helpers import get_db_connection +from leggen.services.database_migrations import run_all_migrations from leggen.services.transaction_processor import TransactionProcessor from leggen.utils.config import config from leggen.utils.paths import path_manager @@ -212,506 +213,8 @@ class DatabaseService: logger.info("SQLite database disabled, skipping migrations") return - await self._migrate_balance_timestamps_if_needed() - await self._migrate_null_transaction_ids_if_needed() - await self._migrate_to_composite_key_if_needed() - await self._migrate_add_display_name_if_needed() - await self._migrate_add_sync_operations_if_needed() - await self._migrate_add_logo_if_needed() - - async def _migrate_balance_timestamps_if_needed(self): - """Check and migrate balance timestamps if needed""" - try: - if await self._check_balance_timestamp_migration_needed(): - logger.info("Balance timestamp migration needed, starting...") - await self._migrate_balance_timestamps() - logger.info("Balance timestamp migration completed") - else: - logger.info("Balance timestamps are already consistent") - except Exception as e: - logger.error(f"Balance timestamp migration failed: {e}") - raise - - async def _check_balance_timestamp_migration_needed(self) -> bool: - """Check if balance timestamps need migration""" db_path = path_manager.get_database_path() - if not db_path.exists(): - return False - - try: - conn = sqlite3.connect(str(db_path)) - cursor = conn.cursor() - - # Check for mixed timestamp types - cursor.execute(""" - SELECT typeof(timestamp) as type, COUNT(*) as count - FROM balances - GROUP BY typeof(timestamp) - """) - - types = cursor.fetchall() - conn.close() - - # If we have both 'real' and 'text' types, migration is needed - type_names = [row[0] for row in types] - return "real" in type_names and "text" in type_names - - except Exception as e: - logger.error(f"Failed to check migration status: {e}") - return False - - async def _migrate_balance_timestamps(self): - """Convert all Unix timestamps to datetime strings""" - db_path = path_manager.get_database_path() - if not db_path.exists(): - logger.warning("Database file not found, skipping migration") - return - - try: - conn = sqlite3.connect(str(db_path)) - cursor = conn.cursor() - - # Get all balances with REAL timestamps - cursor.execute(""" - SELECT id, timestamp - FROM balances - WHERE typeof(timestamp) = 'real' - ORDER BY id - """) - - unix_records = cursor.fetchall() - total_records = len(unix_records) - - if total_records == 0: - logger.info("No Unix timestamps found to migrate") - conn.close() - return - - logger.info( - f"Migrating {total_records} balance records from Unix to datetime format" - ) - - # Convert and update in batches - batch_size = 100 - migrated_count = 0 - - for i in range(0, total_records, batch_size): - batch = unix_records[i : i + batch_size] - - for record_id, unix_timestamp in batch: - try: - # Convert Unix timestamp to datetime string - dt_string = self._unix_to_datetime_string(float(unix_timestamp)) - - # Update the record - cursor.execute( - """ - UPDATE balances - SET timestamp = ? - WHERE id = ? - """, - (dt_string, record_id), - ) - - migrated_count += 1 - - if migrated_count % 100 == 0: - logger.info( - f"Migrated {migrated_count}/{total_records} balance records" - ) - - except Exception as e: - logger.error(f"Failed to migrate record {record_id}: {e}") - continue - - # Commit batch - conn.commit() - - conn.close() - logger.info(f"Successfully migrated {migrated_count} balance records") - - except Exception as e: - logger.error(f"Balance timestamp migration failed: {e}") - raise - - async def _migrate_null_transaction_ids_if_needed(self): - """Check and migrate null transaction IDs if needed""" - try: - if await self._check_null_transaction_ids_migration_needed(): - logger.info("Null transaction IDs migration needed, starting...") - await self._migrate_null_transaction_ids() - logger.info("Null transaction IDs migration completed") - else: - logger.info("No null transaction IDs found to migrate") - except Exception as e: - logger.error(f"Null transaction IDs migration failed: {e}") - raise - - async def _check_null_transaction_ids_migration_needed(self) -> bool: - """Check if null transaction IDs need migration""" - db_path = path_manager.get_database_path() - if not db_path.exists(): - return False - - try: - conn = sqlite3.connect(str(db_path)) - cursor = conn.cursor() - - # Check for transactions with null or empty internalTransactionId - cursor.execute(""" - SELECT COUNT(*) - FROM transactions - WHERE (internalTransactionId IS NULL OR internalTransactionId = '') - AND json_extract(rawTransaction, '$.transactionId') IS NOT NULL - """) - - count = cursor.fetchone()[0] - conn.close() - - return count > 0 - - except Exception as e: - logger.error(f"Failed to check null transaction IDs migration status: {e}") - return False - - async def _migrate_null_transaction_ids(self): - """Populate null internalTransactionId fields using transactionId from raw data""" - import uuid - - db_path = path_manager.get_database_path() - if not db_path.exists(): - logger.warning("Database file not found, skipping migration") - return - - try: - conn = sqlite3.connect(str(db_path)) - cursor = conn.cursor() - - # Get all transactions with null/empty internalTransactionId but valid transactionId in raw data - cursor.execute(""" - SELECT rowid, json_extract(rawTransaction, '$.transactionId') as transactionId - FROM transactions - WHERE (internalTransactionId IS NULL OR internalTransactionId = '') - AND json_extract(rawTransaction, '$.transactionId') IS NOT NULL - ORDER BY rowid - """) - - null_records = cursor.fetchall() - total_records = len(null_records) - - if total_records == 0: - logger.info("No null transaction IDs found to migrate") - conn.close() - return - - logger.info( - f"Migrating {total_records} transaction records with null internalTransactionId" - ) - - # Update in batches - batch_size = 100 - migrated_count = 0 - skipped_duplicates = 0 - - for i in range(0, total_records, batch_size): - batch = null_records[i : i + batch_size] - - for rowid, transaction_id in batch: - try: - # Check if this transactionId is already used by another record - cursor.execute( - "SELECT COUNT(*) FROM transactions WHERE internalTransactionId = ?", - (str(transaction_id),), - ) - existing_count = cursor.fetchone()[0] - - if existing_count > 0: - # Generate a unique ID to avoid constraint violation - unique_id = f"{str(transaction_id)}_{uuid.uuid4().hex[:8]}" - logger.debug( - f"Generated unique ID for duplicate transactionId: {unique_id}" - ) - else: - # Use the original transactionId - unique_id = str(transaction_id) - - # Update the record - cursor.execute( - """ - UPDATE transactions - SET internalTransactionId = ? - WHERE rowid = ? - """, - (unique_id, rowid), - ) - - migrated_count += 1 - - if migrated_count % 100 == 0: - logger.info( - f"Migrated {migrated_count}/{total_records} transaction records" - ) - - except Exception as e: - logger.error(f"Failed to migrate record {rowid}: {e}") - continue - - # Commit batch - conn.commit() - - conn.close() - logger.info(f"Successfully migrated {migrated_count} transaction records") - if skipped_duplicates > 0: - logger.info( - f"Generated unique IDs for {skipped_duplicates} duplicate transactionIds" - ) - - except Exception as e: - logger.error(f"Null transaction IDs migration failed: {e}") - raise - - async def _migrate_to_composite_key_if_needed(self): - """Check and migrate to composite primary key if needed""" - try: - if await self._check_composite_key_migration_needed(): - logger.info("Composite key migration needed, starting...") - await self._migrate_to_composite_key() - logger.info("Composite key migration completed") - else: - logger.info("Composite key migration not needed") - except Exception as e: - logger.error(f"Composite key migration failed: {e}") - raise - - async def _check_composite_key_migration_needed(self) -> bool: - """Check if composite key migration is needed""" - db_path = path_manager.get_database_path() - if not db_path.exists(): - return False - - try: - conn = sqlite3.connect(str(db_path)) - cursor = conn.cursor() - - # Check if transactions table exists - cursor.execute( - "SELECT name FROM sqlite_master WHERE type='table' AND name='transactions'" - ) - if not cursor.fetchone(): - conn.close() - return False - - # Check if transactions table has the old primary key structure - cursor.execute("PRAGMA table_info(transactions)") - columns = cursor.fetchall() - - # Check if internalTransactionId is the primary key (old structure) - internal_transaction_id_is_pk = any( - col[1] == "internalTransactionId" and col[5] == 1 # col[5] is pk flag - for col in columns - ) - - # Check if we have the new composite primary key structure - has_composite_key = any( - col[1] in ["accountId", "transactionId"] - and col[5] == 1 # col[5] is pk flag - for col in columns - ) - - conn.close() - - # Migration is needed if: - # 1. internalTransactionId is still the primary key (old structure), OR - # 2. We don't have the new composite key structure yet - return internal_transaction_id_is_pk or not has_composite_key - - except Exception as e: - logger.error(f"Failed to check composite key migration status: {e}") - return False - - async def _migrate_to_composite_key(self): - """Migrate transactions table to use composite primary key (accountId, transactionId)""" - db_path = path_manager.get_database_path() - if not db_path.exists(): - logger.warning("Database file not found, skipping migration") - return - - try: - conn = sqlite3.connect(str(db_path)) - cursor = conn.cursor() - - logger.info("Starting composite key migration...") - - # Step 1: Create temporary table with new schema - logger.info("Creating temporary table with composite primary key...") - cursor.execute("DROP TABLE IF EXISTS transactions_temp") - cursor.execute(""" - CREATE TABLE transactions_temp ( - accountId TEXT NOT NULL, - transactionId TEXT NOT NULL, - internalTransactionId TEXT, - institutionId TEXT, - iban TEXT, - transactionDate DATETIME, - description TEXT, - transactionValue REAL, - transactionCurrency TEXT, - transactionStatus TEXT, - rawTransaction JSON, - PRIMARY KEY (accountId, transactionId) - ) - """) - - # Step 2: Insert deduplicated data (keep most recent duplicate) - logger.info("Inserting deduplicated data...") - cursor.execute(""" - INSERT INTO transactions_temp - SELECT - accountId, - json_extract(rawTransaction, '$.transactionId') as transactionId, - internalTransactionId, - institutionId, - iban, - transactionDate, - description, - transactionValue, - transactionCurrency, - transactionStatus, - rawTransaction - FROM ( - SELECT *, - ROW_NUMBER() OVER ( - PARTITION BY accountId, json_extract(rawTransaction, '$.transactionId') - ORDER BY transactionDate DESC, rowid DESC - ) as rn - FROM transactions - WHERE json_extract(rawTransaction, '$.transactionId') IS NOT NULL - AND accountId IS NOT NULL - ) WHERE rn = 1 - """) - - # Get counts for reporting - cursor.execute("SELECT COUNT(*) FROM transactions") - old_count = cursor.fetchone()[0] - - cursor.execute("SELECT COUNT(*) FROM transactions_temp") - new_count = cursor.fetchone()[0] - - duplicates_removed = old_count - new_count - logger.info( - f"Migration stats: {old_count} → {new_count} records ({duplicates_removed} duplicates removed)" - ) - - # Step 3: Replace tables - logger.info("Replacing tables...") - cursor.execute("ALTER TABLE transactions RENAME TO transactions_old") - cursor.execute("ALTER TABLE transactions_temp RENAME TO transactions") - - # Step 4: Recreate indexes - logger.info("Recreating indexes...") - cursor.execute( - "CREATE INDEX IF NOT EXISTS idx_transactions_internal_id ON transactions(internalTransactionId)" - ) - cursor.execute( - "CREATE INDEX IF NOT EXISTS idx_transactions_date ON transactions(transactionDate)" - ) - cursor.execute( - "CREATE INDEX IF NOT EXISTS idx_transactions_account_date ON transactions(accountId, transactionDate)" - ) - cursor.execute( - "CREATE INDEX IF NOT EXISTS idx_transactions_amount ON transactions(transactionValue)" - ) - - # Step 5: Cleanup - logger.info("Cleaning up...") - cursor.execute("DROP TABLE transactions_old") - - conn.commit() - conn.close() - - logger.info("Composite key migration completed successfully") - - except Exception as e: - logger.error(f"Composite key migration failed: {e}") - raise - - async def _migrate_add_display_name_if_needed(self): - """Check and add display_name column to accounts table if needed""" - try: - if await self._check_display_name_migration_needed(): - logger.info("Display name column migration needed, starting...") - await self._migrate_add_display_name() - logger.info("Display name column migration completed") - else: - logger.info("Display name column already exists") - except Exception as e: - logger.error(f"Display name column migration failed: {e}") - raise - - async def _check_display_name_migration_needed(self) -> bool: - """Check if display_name column needs to be added to accounts table""" - db_path = path_manager.get_database_path() - if not db_path.exists(): - return False - - try: - conn = sqlite3.connect(str(db_path)) - cursor = conn.cursor() - - # Check if accounts table exists - cursor.execute( - "SELECT name FROM sqlite_master WHERE type='table' AND name='accounts'" - ) - if not cursor.fetchone(): - conn.close() - return False - - # Check if display_name column exists - cursor.execute("PRAGMA table_info(accounts)") - columns = cursor.fetchall() - - # Check if display_name column exists - has_display_name = any(col[1] == "display_name" for col in columns) - - conn.close() - return not has_display_name - - except Exception as e: - logger.error(f"Failed to check display_name migration status: {e}") - return False - - async def _migrate_add_display_name(self): - """Add display_name column to accounts table""" - db_path = path_manager.get_database_path() - if not db_path.exists(): - logger.warning("Database file not found, skipping migration") - return - - try: - conn = sqlite3.connect(str(db_path)) - cursor = conn.cursor() - - logger.info("Adding display_name column to accounts table...") - - # Add the display_name column - cursor.execute(""" - ALTER TABLE accounts - ADD COLUMN display_name TEXT - """) - - conn.commit() - conn.close() - - logger.info("Display name column migration completed successfully") - - except Exception as e: - logger.error(f"Display name column migration failed: {e}") - raise - - def _unix_to_datetime_string(self, unix_timestamp: float) -> str: - """Convert Unix timestamp to datetime string""" - dt = datetime.fromtimestamp(unix_timestamp) - return dt.isoformat() + run_all_migrations(db_path) async def _persist_balance_sqlite( self, account_id: str, balance_data: Dict[str, Any] @@ -935,9 +438,6 @@ class DatabaseService: db_path = path_manager.get_database_path() if not db_path.exists(): return [] - conn = sqlite3.connect(str(db_path)) - conn.row_factory = sqlite3.Row # Enable dict-like access - cursor = conn.cursor() # Build query with filters query = "SELECT * FROM transactions WHERE 1=1" @@ -978,8 +478,9 @@ class DatabaseService: query += " OFFSET ?" params.append(offset) - try: - cursor.execute(query, params) + with get_db_connection(db_path) as conn: + cursor = conn.cursor() + cursor.execute(query, tuple(params)) rows = cursor.fetchall() # Convert to list of dicts and parse JSON fields @@ -992,21 +493,13 @@ class DatabaseService: ) transactions.append(transaction) - conn.close() return transactions - except Exception as e: - conn.close() - raise e - def _get_balances(self, account_id=None): """Get latest balances from SQLite database""" db_path = path_manager.get_database_path() if not db_path.exists(): return [] - conn = sqlite3.connect(str(db_path)) - conn.row_factory = sqlite3.Row - cursor = conn.cursor() # Get latest balance for each account_id and type combination query = """ @@ -1025,29 +518,20 @@ class DatabaseService: query += " ORDER BY b1.account_id, b1.type" - try: - cursor.execute(query, params) + with get_db_connection(db_path) as conn: + cursor = conn.cursor() + cursor.execute(query, tuple(params)) rows = cursor.fetchall() - - balances = [dict(row) for row in rows] - conn.close() - return balances - - except Exception as e: - conn.close() - raise e + return [dict(row) for row in rows] def _get_account_summary(self, account_id): """Get basic account info from transactions table (avoids GoCardless API call)""" db_path = path_manager.get_database_path() if not db_path.exists(): return None - conn = sqlite3.connect(str(db_path)) - conn.row_factory = sqlite3.Row - cursor = conn.cursor() - try: - # Get account info from most recent transaction + with get_db_connection(db_path) as conn: + cursor = conn.cursor() cursor.execute( """ SELECT DISTINCT accountId, institutionId, iban @@ -1058,25 +542,14 @@ class DatabaseService: """, (account_id,), ) - row = cursor.fetchone() - conn.close() - - if row: - return dict(row) - return None - - except Exception as e: - conn.close() - raise e + return dict(row) if row else None def _get_transaction_count(self, account_id=None, **filters): """Get total count of transactions matching filters""" db_path = path_manager.get_database_path() if not db_path.exists(): return 0 - conn = sqlite3.connect(str(db_path)) - cursor = conn.cursor() query = "SELECT COUNT(*) FROM transactions WHERE 1=1" params = [] @@ -1106,50 +579,46 @@ class DatabaseService: query += " AND description LIKE ?" params.append(f"%{filters['search']}%") - try: - cursor.execute(query, params) + with get_db_connection(db_path) as conn: + cursor = conn.cursor() + cursor.execute(query, tuple(params)) count = cursor.fetchone()[0] - conn.close() return count - except Exception as e: - conn.close() - raise e - def _persist_account(self, account_data: dict): """Persist account details to SQLite database""" db_path = path_manager.get_database_path() - conn = sqlite3.connect(str(db_path)) - cursor = conn.cursor() - # Create the accounts table if it doesn't exist - cursor.execute( - """CREATE TABLE IF NOT EXISTS accounts ( - id TEXT PRIMARY KEY, - institution_id TEXT, - status TEXT, - iban TEXT, - name TEXT, - currency TEXT, - created DATETIME, - last_accessed DATETIME, - last_updated DATETIME, - display_name TEXT, - logo TEXT - )""" - ) + with get_db_connection(db_path) as conn: + cursor = conn.cursor() - # Create indexes for accounts table - cursor.execute( - """CREATE INDEX IF NOT EXISTS idx_accounts_institution_id - ON accounts(institution_id)""" - ) - cursor.execute( - """CREATE INDEX IF NOT EXISTS idx_accounts_status - ON accounts(status)""" - ) + # Create the accounts table if it doesn't exist + cursor.execute( + """CREATE TABLE IF NOT EXISTS accounts ( + id TEXT PRIMARY KEY, + institution_id TEXT, + status TEXT, + iban TEXT, + name TEXT, + currency TEXT, + created DATETIME, + last_accessed DATETIME, + last_updated DATETIME, + display_name TEXT, + logo TEXT + )""" + ) + + # Create indexes for accounts table + cursor.execute( + """CREATE INDEX IF NOT EXISTS idx_accounts_institution_id + ON accounts(institution_id)""" + ) + cursor.execute( + """CREATE INDEX IF NOT EXISTS idx_accounts_status + ON accounts(status)""" + ) - try: # First, check if account exists and preserve display_name cursor.execute( "SELECT display_name FROM accounts WHERE id = ?", (account_data["id"],) @@ -1190,22 +659,14 @@ class DatabaseService: ), ) conn.commit() - conn.close() - return account_data - - except Exception as e: - conn.close() - raise e + return account_data def _get_accounts(self, account_ids=None): """Get account details from SQLite database""" db_path = path_manager.get_database_path() if not db_path.exists(): return [] - conn = sqlite3.connect(str(db_path)) - conn.row_factory = sqlite3.Row - cursor = conn.cursor() query = "SELECT * FROM accounts" params = [] @@ -1217,39 +678,23 @@ class DatabaseService: query += " ORDER BY created DESC" - try: - cursor.execute(query, params) + with get_db_connection(db_path) as conn: + cursor = conn.cursor() + cursor.execute(query, tuple(params)) rows = cursor.fetchall() - - accounts = [dict(row) for row in rows] - conn.close() - return accounts - - except Exception as e: - conn.close() - raise e + return [dict(row) for row in rows] def _get_account(self, account_id: str): """Get specific account details from SQLite database""" db_path = path_manager.get_database_path() if not db_path.exists(): return None - conn = sqlite3.connect(str(db_path)) - conn.row_factory = sqlite3.Row - cursor = conn.cursor() - try: + with get_db_connection(db_path) as conn: + cursor = conn.cursor() cursor.execute("SELECT * FROM accounts WHERE id = ?", (account_id,)) row = cursor.fetchone() - conn.close() - - if row: - return dict(row) - return None - - except Exception as e: - conn.close() - raise e + return dict(row) if row else None def _get_historical_balances(self, account_id=None, days=365): """Get historical balance progression based on transaction history""" @@ -1257,84 +702,75 @@ class DatabaseService: if not db_path.exists(): return [] - conn = sqlite3.connect(str(db_path)) - conn.row_factory = sqlite3.Row - cursor = conn.cursor() + cutoff_date = (datetime.now() - timedelta(days=days)).date().isoformat() + today_date = datetime.now().date().isoformat() - try: - cutoff_date = (datetime.now() - timedelta(days=days)).date().isoformat() - today_date = datetime.now().date().isoformat() - - # Single SQL query to generate historical balances using window functions - query = """ - WITH RECURSIVE date_series AS ( - -- Generate weekly dates from cutoff_date to today - SELECT date(?) as ref_date - UNION ALL - SELECT date(ref_date, '+7 days') - FROM date_series - WHERE ref_date < date(?) - ), - current_balances AS ( - -- Get current balance for each account/type - SELECT account_id, type, amount, currency - FROM balances b1 - WHERE b1.timestamp = ( - SELECT MAX(b2.timestamp) - FROM balances b2 - WHERE b2.account_id = b1.account_id AND b2.type = b1.type - ) - {account_filter} - AND b1.type = 'closingBooked' -- Focus on closingBooked for charts - ), - historical_points AS ( - -- Calculate balance at each weekly point by subtracting future transactions - SELECT - cb.account_id, - cb.type as balance_type, - cb.currency, - ds.ref_date, - cb.amount - COALESCE( - (SELECT SUM(t.transactionValue) - FROM transactions t - WHERE t.accountId = cb.account_id - AND date(t.transactionDate) > ds.ref_date), 0 - ) as balance_amount - FROM current_balances cb - CROSS JOIN date_series ds + # Single SQL query to generate historical balances using window functions + query = """ + WITH RECURSIVE date_series AS ( + -- Generate weekly dates from cutoff_date to today + SELECT date(?) as ref_date + UNION ALL + SELECT date(ref_date, '+7 days') + FROM date_series + WHERE ref_date < date(?) + ), + current_balances AS ( + -- Get current balance for each account/type + SELECT account_id, type, amount, currency + FROM balances b1 + WHERE b1.timestamp = ( + SELECT MAX(b2.timestamp) + FROM balances b2 + WHERE b2.account_id = b1.account_id AND b2.type = b1.type ) + {account_filter} + AND b1.type = 'closingBooked' -- Focus on closingBooked for charts + ), + historical_points AS ( + -- Calculate balance at each weekly point by subtracting future transactions SELECT - account_id || '_' || balance_type || '_' || ref_date as id, - account_id, - balance_amount, - balance_type, - currency, - ref_date as reference_date - FROM historical_points - ORDER BY account_id, ref_date - """ + cb.account_id, + cb.type as balance_type, + cb.currency, + ds.ref_date, + cb.amount - COALESCE( + (SELECT SUM(t.transactionValue) + FROM transactions t + WHERE t.accountId = cb.account_id + AND date(t.transactionDate) > ds.ref_date), 0 + ) as balance_amount + FROM current_balances cb + CROSS JOIN date_series ds + ) + SELECT + account_id || '_' || balance_type || '_' || ref_date as id, + account_id, + balance_amount, + balance_type, + currency, + ref_date as reference_date + FROM historical_points + ORDER BY account_id, ref_date + """ - # Build parameters and account filter - params = [cutoff_date, today_date] - if account_id: - account_filter = "AND b1.account_id = ?" - params.append(account_id) - else: - account_filter = "" + # Build parameters and account filter + params = [cutoff_date, today_date] + if account_id: + account_filter = "AND b1.account_id = ?" + params.append(account_id) + else: + account_filter = "" - # Format the query with conditional filter - formatted_query = query.format(account_filter=account_filter) + # Format the query with conditional filter + formatted_query = query.format(account_filter=account_filter) - cursor.execute(formatted_query, params) + with get_db_connection(db_path) as conn: + cursor = conn.cursor() + cursor.execute(formatted_query, tuple(params)) rows = cursor.fetchall() - - conn.close() return [dict(row) for row in rows] - except Exception as e: - conn.close() - raise e - async def get_monthly_transaction_stats_from_db( self, account_id: Optional[str] = None, @@ -1371,42 +807,39 @@ class DatabaseService: if not db_path.exists(): return [] - conn = sqlite3.connect(str(db_path)) - conn.row_factory = sqlite3.Row - cursor = conn.cursor() + # SQL query to aggregate transactions by month + query = """ + SELECT + strftime('%Y-%m', transactionDate) as month, + COALESCE(SUM(CASE WHEN transactionValue > 0 THEN transactionValue ELSE 0 END), 0) as income, + COALESCE(SUM(CASE WHEN transactionValue < 0 THEN ABS(transactionValue) ELSE 0 END), 0) as expenses, + COALESCE(SUM(transactionValue), 0) as net + FROM transactions + WHERE 1=1 + """ - try: - # SQL query to aggregate transactions by month - query = """ - SELECT - strftime('%Y-%m', transactionDate) as month, - COALESCE(SUM(CASE WHEN transactionValue > 0 THEN transactionValue ELSE 0 END), 0) as income, - COALESCE(SUM(CASE WHEN transactionValue < 0 THEN ABS(transactionValue) ELSE 0 END), 0) as expenses, - COALESCE(SUM(transactionValue), 0) as net - FROM transactions - WHERE 1=1 - """ + params = [] - params = [] + if account_id: + query += " AND accountId = ?" + params.append(account_id) - if account_id: - query += " AND accountId = ?" - params.append(account_id) + if date_from: + query += " AND transactionDate >= ?" + params.append(date_from) - if date_from: - query += " AND transactionDate >= ?" - params.append(date_from) + if date_to: + query += " AND transactionDate <= ?" + params.append(date_to) - if date_to: - query += " AND transactionDate <= ?" - params.append(date_to) + query += """ + GROUP BY strftime('%Y-%m', transactionDate) + ORDER BY month ASC + """ - query += """ - GROUP BY strftime('%Y-%m', transactionDate) - ORDER BY month ASC - """ - - cursor.execute(query, params) + with get_db_connection(db_path) as conn: + cursor = conn.cursor() + cursor.execute(query, tuple(params)) rows = cursor.fetchall() # Convert to desired format with proper month display @@ -1426,173 +859,8 @@ class DatabaseService: } ) - conn.close() return monthly_stats - except Exception as e: - conn.close() - raise e - - async def _check_sync_operations_migration_needed(self) -> bool: - """Check if sync_operations table needs to be created""" - db_path = path_manager.get_database_path() - if not db_path.exists(): - return False - - try: - conn = sqlite3.connect(str(db_path)) - cursor = conn.cursor() - - # Check if sync_operations table exists - cursor.execute( - "SELECT name FROM sqlite_master WHERE type='table' AND name='sync_operations'" - ) - table_exists = cursor.fetchone() is not None - - conn.close() - return not table_exists - - except Exception as e: - logger.error(f"Failed to check sync_operations migration status: {e}") - return False - - async def _migrate_add_sync_operations(self): - """Add sync_operations table""" - db_path = path_manager.get_database_path() - if not db_path.exists(): - logger.warning("Database file not found, skipping migration") - return - - try: - conn = sqlite3.connect(str(db_path)) - cursor = conn.cursor() - - logger.info("Creating sync_operations table...") - - # Create the sync_operations table - cursor.execute(""" - CREATE TABLE sync_operations ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - started_at DATETIME NOT NULL, - completed_at DATETIME, - success BOOLEAN, - accounts_processed INTEGER DEFAULT 0, - transactions_added INTEGER DEFAULT 0, - transactions_updated INTEGER DEFAULT 0, - balances_updated INTEGER DEFAULT 0, - duration_seconds REAL, - errors TEXT, - logs TEXT, - trigger_type TEXT DEFAULT 'manual' - ) - """) - - # Create indexes for better performance - cursor.execute( - "CREATE INDEX IF NOT EXISTS idx_sync_operations_started_at ON sync_operations(started_at)" - ) - cursor.execute( - "CREATE INDEX IF NOT EXISTS idx_sync_operations_success ON sync_operations(success)" - ) - cursor.execute( - "CREATE INDEX IF NOT EXISTS idx_sync_operations_trigger_type ON sync_operations(trigger_type)" - ) - - conn.commit() - conn.close() - - logger.info("Sync operations table migration completed successfully") - - except Exception as e: - logger.error(f"Sync operations table migration failed: {e}") - raise - - async def _migrate_add_sync_operations_if_needed(self): - """Check and add sync_operations table if needed""" - try: - if await self._check_sync_operations_migration_needed(): - logger.info("Sync operations table migration needed, starting...") - await self._migrate_add_sync_operations() - logger.info("Sync operations table migration completed") - else: - logger.info("Sync operations table already exists") - except Exception as e: - logger.error(f"Sync operations table migration failed: {e}") - raise - - async def _migrate_add_logo_if_needed(self): - """Check and add logo column to accounts table if needed""" - try: - if await self._check_logo_migration_needed(): - logger.info("Logo column migration needed, starting...") - await self._migrate_add_logo() - logger.info("Logo column migration completed") - else: - logger.info("Logo column already exists") - except Exception as e: - logger.error(f"Logo column migration failed: {e}") - raise - - async def _check_logo_migration_needed(self) -> bool: - """Check if logo column needs to be added to accounts table""" - db_path = path_manager.get_database_path() - if not db_path.exists(): - return False - - try: - conn = sqlite3.connect(str(db_path)) - cursor = conn.cursor() - - # Check if accounts table exists - cursor.execute( - "SELECT name FROM sqlite_master WHERE type='table' AND name='accounts'" - ) - if not cursor.fetchone(): - conn.close() - return False - - # Check if logo column exists - cursor.execute("PRAGMA table_info(accounts)") - columns = cursor.fetchall() - - # Check if logo column exists - has_logo = any(col[1] == "logo" for col in columns) - - conn.close() - return not has_logo - - except Exception as e: - logger.error(f"Failed to check logo migration status: {e}") - return False - - async def _migrate_add_logo(self): - """Add logo column to accounts table""" - db_path = path_manager.get_database_path() - if not db_path.exists(): - logger.warning("Database file not found, skipping migration") - return - - try: - conn = sqlite3.connect(str(db_path)) - cursor = conn.cursor() - - logger.info("Adding logo column to accounts table...") - - # Add the logo column - cursor.execute(""" - ALTER TABLE accounts - ADD COLUMN logo TEXT - """) - - conn.commit() - conn.close() - - logger.info("Logo column migration completed successfully") - - except Exception as e: - logger.error(f"Logo column migration failed: {e}") - raise - async def persist_sync_operation(self, sync_operation: Dict[str, Any]) -> int: """Persist sync operation to database and return the ID""" if not self.sqlite_enabled: diff --git a/leggen/utils/disk.py b/leggen/utils/disk.py deleted file mode 100644 index 71ab57a..0000000 --- a/leggen/utils/disk.py +++ /dev/null @@ -1,35 +0,0 @@ -import json -import sys -from pathlib import Path - -import click - -from leggen.utils.text import error, info - - -def save_file(name: str, d: dict): - Path.mkdir(Path(click.get_app_dir("leggen")), exist_ok=True) - config_file = click.get_app_dir("leggen") / Path(name) - - with click.open_file(str(config_file), "w") as f: - json.dump(d, f) - info(f"Wrote configuration file at '{config_file}'") - - -def load_file(name: str) -> dict: - config_file = click.get_app_dir("leggen") / Path(name) - try: - with click.open_file(str(config_file), "r") as f: - config = json.load(f) - return config - except FileNotFoundError: - error(f"Configuration file '{config_file}' not found") - sys.exit(1) - - -def get_prefixed_files(prefix: str) -> list: - return [ - f.name - for f in Path(click.get_app_dir("leggen")).iterdir() - if f.name.startswith(prefix) - ]