diff --git a/leggend/services/database_service.py b/leggend/services/database_service.py index 0dcdcd7..902d6e5 100644 --- a/leggend/services/database_service.py +++ b/leggend/services/database_service.py @@ -93,13 +93,17 @@ class DatabaseService: ",".join(transaction.get("remittanceInformationUnstructuredArray", [])), ) - # Extract transaction ID, using transactionId as fallback when internalTransactionId is missing - transaction_id = transaction.get("internalTransactionId") or transaction.get( - "transactionId" - ) + # Extract transaction IDs - transactionId is now primary, internalTransactionId is reference + transaction_id = transaction.get("transactionId") + internal_transaction_id = transaction.get("internalTransactionId") + + if not transaction_id: + raise ValueError("Transaction missing required transactionId field") return { - "internalTransactionId": transaction_id, + "accountId": account_id, + "transactionId": transaction_id, + "internalTransactionId": internal_transaction_id, "institutionId": account_info["institution_id"], "iban": account_info.get("iban", "N/A"), "transactionDate": min_date, @@ -107,7 +111,6 @@ class DatabaseService: "transactionValue": amount, "transactionCurrency": currency, "transactionStatus": status, - "accountId": account_id, "rawTransaction": transaction, } @@ -260,6 +263,7 @@ class DatabaseService: await self._migrate_balance_timestamps_if_needed() await self._migrate_null_transaction_ids_if_needed() + await self._migrate_to_composite_key_if_needed() async def _migrate_balance_timestamps_if_needed(self): """Check and migrate balance timestamps if needed""" @@ -519,6 +523,168 @@ class DatabaseService: logger.error(f"Null transaction IDs migration failed: {e}") raise + async def _migrate_to_composite_key_if_needed(self): + """Check and migrate to composite primary key if needed""" + try: + if await self._check_composite_key_migration_needed(): + logger.info("Composite key migration needed, starting...") + await self._migrate_to_composite_key() + logger.info("Composite key migration completed") + else: + logger.info("Composite key migration not needed") + except Exception as e: + logger.error(f"Composite key migration failed: {e}") + raise + + async def _check_composite_key_migration_needed(self) -> bool: + """Check if composite key migration is needed""" + from pathlib import Path + + db_path = Path.home() / ".config" / "leggen" / "leggen.db" + if not db_path.exists(): + return False + + try: + conn = sqlite3.connect(str(db_path)) + cursor = conn.cursor() + + # Check if transactions table has the old primary key structure + cursor.execute("PRAGMA table_info(transactions)") + columns = cursor.fetchall() + column_names = [col[1] for col in columns] + + # If we have internalTransactionId as primary key, migration is needed + if "internalTransactionId" in column_names: + # Check if there are duplicate (accountId, transactionId) pairs + cursor.execute(""" + SELECT COUNT(*) as duplicates + FROM ( + SELECT accountId, json_extract(rawTransaction, '$.transactionId') as transactionId, COUNT(*) as cnt + FROM transactions + WHERE json_extract(rawTransaction, '$.transactionId') IS NOT NULL + GROUP BY accountId, json_extract(rawTransaction, '$.transactionId') + HAVING COUNT(*) > 1 + ) + """) + duplicates = cursor.fetchone()[0] + conn.close() + return duplicates > 0 + else: + conn.close() + return False + + except Exception as e: + logger.error(f"Failed to check composite key migration status: {e}") + return False + + async def _migrate_to_composite_key(self): + """Migrate transactions table to use composite primary key (accountId, transactionId)""" + from pathlib import Path + + db_path = Path.home() / ".config" / "leggen" / "leggen.db" + if not db_path.exists(): + logger.warning("Database file not found, skipping migration") + return + + try: + conn = sqlite3.connect(str(db_path)) + cursor = conn.cursor() + + logger.info("Starting composite key migration...") + + # Step 1: Create temporary table with new schema + logger.info("Creating temporary table with composite primary key...") + cursor.execute("DROP TABLE IF EXISTS transactions_temp") + cursor.execute(""" + CREATE TABLE transactions_temp ( + accountId TEXT NOT NULL, + transactionId TEXT NOT NULL, + internalTransactionId TEXT, + institutionId TEXT, + iban TEXT, + transactionDate DATETIME, + description TEXT, + transactionValue REAL, + transactionCurrency TEXT, + transactionStatus TEXT, + rawTransaction JSON, + PRIMARY KEY (accountId, transactionId) + ) + """) + + # Step 2: Insert deduplicated data (keep most recent duplicate) + logger.info("Inserting deduplicated data...") + cursor.execute(""" + INSERT INTO transactions_temp + SELECT + accountId, + json_extract(rawTransaction, '$.transactionId') as transactionId, + internalTransactionId, + institutionId, + iban, + transactionDate, + description, + transactionValue, + transactionCurrency, + transactionStatus, + rawTransaction + FROM ( + SELECT *, + ROW_NUMBER() OVER ( + PARTITION BY accountId, json_extract(rawTransaction, '$.transactionId') + ORDER BY transactionDate DESC, rowid DESC + ) as rn + FROM transactions + WHERE json_extract(rawTransaction, '$.transactionId') IS NOT NULL + AND accountId IS NOT NULL + ) WHERE rn = 1 + """) + + # Get counts for reporting + cursor.execute("SELECT COUNT(*) FROM transactions") + old_count = cursor.fetchone()[0] + + cursor.execute("SELECT COUNT(*) FROM transactions_temp") + new_count = cursor.fetchone()[0] + + duplicates_removed = old_count - new_count + logger.info( + f"Migration stats: {old_count} → {new_count} records ({duplicates_removed} duplicates removed)" + ) + + # Step 3: Replace tables + logger.info("Replacing tables...") + cursor.execute("ALTER TABLE transactions RENAME TO transactions_old") + cursor.execute("ALTER TABLE transactions_temp RENAME TO transactions") + + # Step 4: Recreate indexes + logger.info("Recreating indexes...") + cursor.execute( + "CREATE INDEX IF NOT EXISTS idx_transactions_internal_id ON transactions(internalTransactionId)" + ) + cursor.execute( + "CREATE INDEX IF NOT EXISTS idx_transactions_date ON transactions(transactionDate)" + ) + cursor.execute( + "CREATE INDEX IF NOT EXISTS idx_transactions_account_date ON transactions(accountId, transactionDate)" + ) + cursor.execute( + "CREATE INDEX IF NOT EXISTS idx_transactions_amount ON transactions(transactionValue)" + ) + + # Step 5: Cleanup + logger.info("Cleaning up...") + cursor.execute("DROP TABLE transactions_old") + + conn.commit() + conn.close() + + logger.info("Composite key migration completed successfully") + + except Exception as e: + logger.error(f"Composite key migration failed: {e}") + raise + def _unix_to_datetime_string(self, unix_timestamp: float) -> str: """Convert Unix timestamp to datetime string""" dt = datetime.fromtimestamp(unix_timestamp) @@ -620,10 +786,13 @@ class DatabaseService: conn = sqlite3.connect(str(db_path)) cursor = conn.cursor() - # Create the transactions table if it doesn't exist + # The table should already exist with the new schema from migration + # If it doesn't exist, create it (for new installations) cursor.execute( """CREATE TABLE IF NOT EXISTS transactions ( - internalTransactionId TEXT PRIMARY KEY, + accountId TEXT NOT NULL, + transactionId TEXT NOT NULL, + internalTransactionId TEXT, institutionId TEXT, iban TEXT, transactionDate DATETIME, @@ -631,15 +800,15 @@ class DatabaseService: transactionValue REAL, transactionCurrency TEXT, transactionStatus TEXT, - accountId TEXT, - rawTransaction JSON + rawTransaction JSON, + PRIMARY KEY (accountId, transactionId) )""" ) - # Create indexes for better performance + # Create indexes for better performance (if they don't exist) cursor.execute( - """CREATE INDEX IF NOT EXISTS idx_transactions_account_id - ON transactions(accountId)""" + """CREATE INDEX IF NOT EXISTS idx_transactions_internal_id + ON transactions(internalTransactionId)""" ) cursor.execute( """CREATE INDEX IF NOT EXISTS idx_transactions_date @@ -654,8 +823,10 @@ class DatabaseService: ON transactions(transactionValue)""" ) - # Prepare an SQL statement for inserting data - insert_sql = """INSERT INTO transactions ( + # Prepare an SQL statement for inserting/replacing data + insert_sql = """INSERT OR REPLACE INTO transactions ( + accountId, + transactionId, internalTransactionId, institutionId, iban, @@ -664,9 +835,8 @@ class DatabaseService: transactionValue, transactionCurrency, transactionStatus, - accountId, rawTransaction - ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""" + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""" new_transactions = [] @@ -675,7 +845,9 @@ class DatabaseService: cursor.execute( insert_sql, ( - transaction["internalTransactionId"], + transaction["accountId"], + transaction["transactionId"], + transaction.get("internalTransactionId"), transaction["institutionId"], transaction["iban"], transaction["transactionDate"], @@ -683,13 +855,14 @@ class DatabaseService: transaction["transactionValue"], transaction["transactionCurrency"], transaction["transactionStatus"], - transaction["accountId"], json.dumps(transaction["rawTransaction"]), ), ) new_transactions.append(transaction) - except sqlite3.IntegrityError: - # Transaction already exists + except sqlite3.IntegrityError as e: + logger.warning( + f"Failed to insert transaction {transaction.get('transactionId')}: {e}" + ) continue conn.commit() diff --git a/tests/unit/test_database_service.py b/tests/unit/test_database_service.py index 1494725..0594dff 100644 --- a/tests/unit/test_database_service.py +++ b/tests/unit/test_database_service.py @@ -18,6 +18,8 @@ def sample_transactions_db_format(): """Sample transactions in database format.""" return [ { + "accountId": "test-account-123", + "transactionId": "txn-001", "internalTransactionId": "txn-001", "institutionId": "REVOLUT_REVOLT21", "iban": "LT313250081177977789", @@ -26,10 +28,11 @@ def sample_transactions_db_format(): "transactionValue": -10.50, "transactionCurrency": "EUR", "transactionStatus": "booked", - "accountId": "test-account-123", - "rawTransaction": {"some": "data"}, + "rawTransaction": {"transactionId": "txn-001", "some": "data"}, }, { + "accountId": "test-account-123", + "transactionId": "txn-002", "internalTransactionId": "txn-002", "institutionId": "REVOLUT_REVOLT21", "iban": "LT313250081177977789", @@ -38,8 +41,7 @@ def sample_transactions_db_format(): "transactionValue": -45.30, "transactionCurrency": "EUR", "transactionStatus": "booked", - "accountId": "test-account-123", - "rawTransaction": {"other": "data"}, + "rawTransaction": {"transactionId": "txn-002", "other": "data"}, }, ] @@ -351,6 +353,7 @@ class TestDatabaseService: "booked": [ { "internalTransactionId": "txn-001", + "transactionId": "txn-001", "bookingDate": "2025-09-01", "transactionAmount": {"amount": "-10.50", "currency": "EUR"}, "remittanceInformationUnstructured": "Coffee Shop", @@ -359,6 +362,7 @@ class TestDatabaseService: "pending": [ { "internalTransactionId": "txn-002", + "transactionId": "txn-002", "bookingDate": "2025-09-02", "transactionAmount": {"amount": "-25.00", "currency": "EUR"}, "remittanceInformationUnstructured": "Gas Station", @@ -375,12 +379,14 @@ class TestDatabaseService: # Check booked transaction booked_txn = next(t for t in result if t["transactionStatus"] == "booked") + assert booked_txn["transactionId"] == "txn-001" assert booked_txn["internalTransactionId"] == "txn-001" assert booked_txn["transactionValue"] == -10.50 assert booked_txn["description"] == "Coffee Shop" # Check pending transaction pending_txn = next(t for t in result if t["transactionStatus"] == "pending") + assert pending_txn["transactionId"] == "txn-002" assert pending_txn["internalTransactionId"] == "txn-002" assert pending_txn["transactionValue"] == -25.00 assert pending_txn["description"] == "Gas Station" @@ -416,6 +422,7 @@ class TestDatabaseService: "booked": [ { "internalTransactionId": "txn-001", + "transactionId": "txn-001", "bookingDate": "2025-09-01", "transactionAmount": {"amount": "-10.50", "currency": "EUR"}, "remittanceInformationUnstructuredArray": ["Line 1", "Line 2"],