fix(api): resolve duplicate transactions with composite key migration

- Migrate transactions table to use (accountId, transactionId) composite primary key
- Replace unstable internalTransactionId with stable bank-provided transactionId
- Update persistence logic to use INSERT OR REPLACE for automatic conflict resolution
- Maintain API compatibility by preserving internalTransactionId field
- Update tests to match new transaction processing format

This resolves the issue where GoCardless returns different internalTransactionId
values for the same transaction across sync operations, causing duplicates.
This commit is contained in:
Elisiário Couto
2025-09-10 20:00:43 +01:00
parent 433ba3faf9
commit 13e92ccd34
2 changed files with 205 additions and 25 deletions

View File

@@ -93,13 +93,17 @@ class DatabaseService:
",".join(transaction.get("remittanceInformationUnstructuredArray", [])),
)
# Extract transaction ID, using transactionId as fallback when internalTransactionId is missing
transaction_id = transaction.get("internalTransactionId") or transaction.get(
"transactionId"
)
# Extract transaction IDs - transactionId is now primary, internalTransactionId is reference
transaction_id = transaction.get("transactionId")
internal_transaction_id = transaction.get("internalTransactionId")
if not transaction_id:
raise ValueError("Transaction missing required transactionId field")
return {
"internalTransactionId": transaction_id,
"accountId": account_id,
"transactionId": transaction_id,
"internalTransactionId": internal_transaction_id,
"institutionId": account_info["institution_id"],
"iban": account_info.get("iban", "N/A"),
"transactionDate": min_date,
@@ -107,7 +111,6 @@ class DatabaseService:
"transactionValue": amount,
"transactionCurrency": currency,
"transactionStatus": status,
"accountId": account_id,
"rawTransaction": transaction,
}
@@ -260,6 +263,7 @@ class DatabaseService:
await self._migrate_balance_timestamps_if_needed()
await self._migrate_null_transaction_ids_if_needed()
await self._migrate_to_composite_key_if_needed()
async def _migrate_balance_timestamps_if_needed(self):
"""Check and migrate balance timestamps if needed"""
@@ -519,6 +523,168 @@ class DatabaseService:
logger.error(f"Null transaction IDs migration failed: {e}")
raise
async def _migrate_to_composite_key_if_needed(self):
"""Check and migrate to composite primary key if needed"""
try:
if await self._check_composite_key_migration_needed():
logger.info("Composite key migration needed, starting...")
await self._migrate_to_composite_key()
logger.info("Composite key migration completed")
else:
logger.info("Composite key migration not needed")
except Exception as e:
logger.error(f"Composite key migration failed: {e}")
raise
async def _check_composite_key_migration_needed(self) -> bool:
"""Check if composite key migration is needed"""
from pathlib import Path
db_path = Path.home() / ".config" / "leggen" / "leggen.db"
if not db_path.exists():
return False
try:
conn = sqlite3.connect(str(db_path))
cursor = conn.cursor()
# Check if transactions table has the old primary key structure
cursor.execute("PRAGMA table_info(transactions)")
columns = cursor.fetchall()
column_names = [col[1] for col in columns]
# If we have internalTransactionId as primary key, migration is needed
if "internalTransactionId" in column_names:
# Check if there are duplicate (accountId, transactionId) pairs
cursor.execute("""
SELECT COUNT(*) as duplicates
FROM (
SELECT accountId, json_extract(rawTransaction, '$.transactionId') as transactionId, COUNT(*) as cnt
FROM transactions
WHERE json_extract(rawTransaction, '$.transactionId') IS NOT NULL
GROUP BY accountId, json_extract(rawTransaction, '$.transactionId')
HAVING COUNT(*) > 1
)
""")
duplicates = cursor.fetchone()[0]
conn.close()
return duplicates > 0
else:
conn.close()
return False
except Exception as e:
logger.error(f"Failed to check composite key migration status: {e}")
return False
async def _migrate_to_composite_key(self):
"""Migrate transactions table to use composite primary key (accountId, transactionId)"""
from pathlib import Path
db_path = Path.home() / ".config" / "leggen" / "leggen.db"
if not db_path.exists():
logger.warning("Database file not found, skipping migration")
return
try:
conn = sqlite3.connect(str(db_path))
cursor = conn.cursor()
logger.info("Starting composite key migration...")
# Step 1: Create temporary table with new schema
logger.info("Creating temporary table with composite primary key...")
cursor.execute("DROP TABLE IF EXISTS transactions_temp")
cursor.execute("""
CREATE TABLE transactions_temp (
accountId TEXT NOT NULL,
transactionId TEXT NOT NULL,
internalTransactionId TEXT,
institutionId TEXT,
iban TEXT,
transactionDate DATETIME,
description TEXT,
transactionValue REAL,
transactionCurrency TEXT,
transactionStatus TEXT,
rawTransaction JSON,
PRIMARY KEY (accountId, transactionId)
)
""")
# Step 2: Insert deduplicated data (keep most recent duplicate)
logger.info("Inserting deduplicated data...")
cursor.execute("""
INSERT INTO transactions_temp
SELECT
accountId,
json_extract(rawTransaction, '$.transactionId') as transactionId,
internalTransactionId,
institutionId,
iban,
transactionDate,
description,
transactionValue,
transactionCurrency,
transactionStatus,
rawTransaction
FROM (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY accountId, json_extract(rawTransaction, '$.transactionId')
ORDER BY transactionDate DESC, rowid DESC
) as rn
FROM transactions
WHERE json_extract(rawTransaction, '$.transactionId') IS NOT NULL
AND accountId IS NOT NULL
) WHERE rn = 1
""")
# Get counts for reporting
cursor.execute("SELECT COUNT(*) FROM transactions")
old_count = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM transactions_temp")
new_count = cursor.fetchone()[0]
duplicates_removed = old_count - new_count
logger.info(
f"Migration stats: {old_count}{new_count} records ({duplicates_removed} duplicates removed)"
)
# Step 3: Replace tables
logger.info("Replacing tables...")
cursor.execute("ALTER TABLE transactions RENAME TO transactions_old")
cursor.execute("ALTER TABLE transactions_temp RENAME TO transactions")
# Step 4: Recreate indexes
logger.info("Recreating indexes...")
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_transactions_internal_id ON transactions(internalTransactionId)"
)
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_transactions_date ON transactions(transactionDate)"
)
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_transactions_account_date ON transactions(accountId, transactionDate)"
)
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_transactions_amount ON transactions(transactionValue)"
)
# Step 5: Cleanup
logger.info("Cleaning up...")
cursor.execute("DROP TABLE transactions_old")
conn.commit()
conn.close()
logger.info("Composite key migration completed successfully")
except Exception as e:
logger.error(f"Composite key migration failed: {e}")
raise
def _unix_to_datetime_string(self, unix_timestamp: float) -> str:
"""Convert Unix timestamp to datetime string"""
dt = datetime.fromtimestamp(unix_timestamp)
@@ -620,10 +786,13 @@ class DatabaseService:
conn = sqlite3.connect(str(db_path))
cursor = conn.cursor()
# Create the transactions table if it doesn't exist
# The table should already exist with the new schema from migration
# If it doesn't exist, create it (for new installations)
cursor.execute(
"""CREATE TABLE IF NOT EXISTS transactions (
internalTransactionId TEXT PRIMARY KEY,
accountId TEXT NOT NULL,
transactionId TEXT NOT NULL,
internalTransactionId TEXT,
institutionId TEXT,
iban TEXT,
transactionDate DATETIME,
@@ -631,15 +800,15 @@ class DatabaseService:
transactionValue REAL,
transactionCurrency TEXT,
transactionStatus TEXT,
accountId TEXT,
rawTransaction JSON
rawTransaction JSON,
PRIMARY KEY (accountId, transactionId)
)"""
)
# Create indexes for better performance
# Create indexes for better performance (if they don't exist)
cursor.execute(
"""CREATE INDEX IF NOT EXISTS idx_transactions_account_id
ON transactions(accountId)"""
"""CREATE INDEX IF NOT EXISTS idx_transactions_internal_id
ON transactions(internalTransactionId)"""
)
cursor.execute(
"""CREATE INDEX IF NOT EXISTS idx_transactions_date
@@ -654,8 +823,10 @@ class DatabaseService:
ON transactions(transactionValue)"""
)
# Prepare an SQL statement for inserting data
insert_sql = """INSERT INTO transactions (
# Prepare an SQL statement for inserting/replacing data
insert_sql = """INSERT OR REPLACE INTO transactions (
accountId,
transactionId,
internalTransactionId,
institutionId,
iban,
@@ -664,9 +835,8 @@ class DatabaseService:
transactionValue,
transactionCurrency,
transactionStatus,
accountId,
rawTransaction
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"""
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"""
new_transactions = []
@@ -675,7 +845,9 @@ class DatabaseService:
cursor.execute(
insert_sql,
(
transaction["internalTransactionId"],
transaction["accountId"],
transaction["transactionId"],
transaction.get("internalTransactionId"),
transaction["institutionId"],
transaction["iban"],
transaction["transactionDate"],
@@ -683,13 +855,14 @@ class DatabaseService:
transaction["transactionValue"],
transaction["transactionCurrency"],
transaction["transactionStatus"],
transaction["accountId"],
json.dumps(transaction["rawTransaction"]),
),
)
new_transactions.append(transaction)
except sqlite3.IntegrityError:
# Transaction already exists
except sqlite3.IntegrityError as e:
logger.warning(
f"Failed to insert transaction {transaction.get('transactionId')}: {e}"
)
continue
conn.commit()

View File

@@ -18,6 +18,8 @@ def sample_transactions_db_format():
"""Sample transactions in database format."""
return [
{
"accountId": "test-account-123",
"transactionId": "txn-001",
"internalTransactionId": "txn-001",
"institutionId": "REVOLUT_REVOLT21",
"iban": "LT313250081177977789",
@@ -26,10 +28,11 @@ def sample_transactions_db_format():
"transactionValue": -10.50,
"transactionCurrency": "EUR",
"transactionStatus": "booked",
"accountId": "test-account-123",
"rawTransaction": {"some": "data"},
"rawTransaction": {"transactionId": "txn-001", "some": "data"},
},
{
"accountId": "test-account-123",
"transactionId": "txn-002",
"internalTransactionId": "txn-002",
"institutionId": "REVOLUT_REVOLT21",
"iban": "LT313250081177977789",
@@ -38,8 +41,7 @@ def sample_transactions_db_format():
"transactionValue": -45.30,
"transactionCurrency": "EUR",
"transactionStatus": "booked",
"accountId": "test-account-123",
"rawTransaction": {"other": "data"},
"rawTransaction": {"transactionId": "txn-002", "other": "data"},
},
]
@@ -351,6 +353,7 @@ class TestDatabaseService:
"booked": [
{
"internalTransactionId": "txn-001",
"transactionId": "txn-001",
"bookingDate": "2025-09-01",
"transactionAmount": {"amount": "-10.50", "currency": "EUR"},
"remittanceInformationUnstructured": "Coffee Shop",
@@ -359,6 +362,7 @@ class TestDatabaseService:
"pending": [
{
"internalTransactionId": "txn-002",
"transactionId": "txn-002",
"bookingDate": "2025-09-02",
"transactionAmount": {"amount": "-25.00", "currency": "EUR"},
"remittanceInformationUnstructured": "Gas Station",
@@ -375,12 +379,14 @@ class TestDatabaseService:
# Check booked transaction
booked_txn = next(t for t in result if t["transactionStatus"] == "booked")
assert booked_txn["transactionId"] == "txn-001"
assert booked_txn["internalTransactionId"] == "txn-001"
assert booked_txn["transactionValue"] == -10.50
assert booked_txn["description"] == "Coffee Shop"
# Check pending transaction
pending_txn = next(t for t in result if t["transactionStatus"] == "pending")
assert pending_txn["transactionId"] == "txn-002"
assert pending_txn["internalTransactionId"] == "txn-002"
assert pending_txn["transactionValue"] == -25.00
assert pending_txn["description"] == "Gas Station"
@@ -416,6 +422,7 @@ class TestDatabaseService:
"booked": [
{
"internalTransactionId": "txn-001",
"transactionId": "txn-001",
"bookingDate": "2025-09-01",
"transactionAmount": {"amount": "-10.50", "currency": "EUR"},
"remittanceInformationUnstructuredArray": ["Line 1", "Line 2"],