refactor: Migrate database service to SQLModel and Alembic.

- Add SQLModel for type-safe database models
- Implement Alembic for schema migration management
- Create 7 migrations covering all existing schema changes
- Add automatic migration system that runs on startup
- Maintain backward compatibility with existing raw SQL queries
- Remove old manual migration system
- All tests pass (109 tests)

Benefits:
- Full type safety with Pydantic validation
- Version-controlled schema changes
- Automatic migration detection and application
- Better developer experience with typed models
This commit is contained in:
Elisiário Couto
2025-09-30 23:34:48 +01:00
parent 5465941058
commit dc7aed316d
18 changed files with 1030 additions and 661 deletions

View File

@@ -0,0 +1,102 @@
"""migrate_to_composite_key
Migrate transactions table to use composite primary key (accountId, transactionId).
Revision ID: 1ba02efe481c
Revises: bf30246cb723
Create Date: 2025-09-30 23:16:34.637762
"""
from typing import Sequence, Union
from sqlalchemy import text
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "1ba02efe481c"
down_revision: Union[str, Sequence[str], None] = "bf30246cb723"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Migrate to composite primary key."""
conn = op.get_bind()
# Check if migration is needed
result = conn.execute(
text("""
SELECT name FROM sqlite_master
WHERE type='table' AND name='transactions'
""")
)
if not result.fetchone():
return
# Create temporary table with new schema
op.execute("""
CREATE TABLE transactions_temp (
accountId TEXT NOT NULL,
transactionId TEXT NOT NULL,
internalTransactionId TEXT,
institutionId TEXT NOT NULL,
iban TEXT,
transactionDate DATETIME,
description TEXT,
transactionValue REAL,
transactionCurrency TEXT,
transactionStatus TEXT,
rawTransaction JSON NOT NULL,
PRIMARY KEY (accountId, transactionId)
)
""")
# Insert deduplicated data (keep most recent duplicate)
op.execute("""
INSERT INTO transactions_temp
SELECT
accountId,
json_extract(rawTransaction, '$.transactionId') as transactionId,
internalTransactionId,
institutionId,
iban,
transactionDate,
description,
transactionValue,
transactionCurrency,
transactionStatus,
rawTransaction
FROM (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY accountId, json_extract(rawTransaction, '$.transactionId')
ORDER BY transactionDate DESC, rowid DESC
) as rn
FROM transactions
WHERE json_extract(rawTransaction, '$.transactionId') IS NOT NULL
AND accountId IS NOT NULL
) WHERE rn = 1
""")
# Replace tables
op.execute("DROP TABLE transactions")
op.execute("ALTER TABLE transactions_temp RENAME TO transactions")
# Recreate indexes
op.create_index(
"idx_transactions_internal_id", "transactions", ["internalTransactionId"]
)
op.create_index("idx_transactions_date", "transactions", ["transactionDate"])
op.create_index(
"idx_transactions_account_date",
"transactions",
["accountId", "transactionDate"],
)
op.create_index("idx_transactions_amount", "transactions", ["transactionValue"])
def downgrade() -> None:
"""Not implemented - would require changing primary key back."""

View File

@@ -0,0 +1,56 @@
"""add_transaction_enrichments_table
Add transaction_enrichments table for storing enriched transaction data.
Revision ID: 4819c868ebc1
Revises: dd9f6a55604c
Create Date: 2025-09-30 23:20:00.969614
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "4819c868ebc1"
down_revision: Union[str, Sequence[str], None] = "dd9f6a55604c"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Create transaction_enrichments table."""
op.create_table(
"transaction_enrichments",
sa.Column("accountId", sa.String(), nullable=False),
sa.Column("transactionId", sa.String(), nullable=False),
sa.Column("clean_name", sa.String(), nullable=True),
sa.Column("category", sa.String(), nullable=True),
sa.Column("logo_url", sa.String(), nullable=True),
sa.Column("created_at", sa.DateTime(), nullable=False),
sa.Column("updated_at", sa.DateTime(), nullable=False),
sa.ForeignKeyConstraint(
["accountId", "transactionId"],
["transactions.accountId", "transactions.transactionId"],
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("accountId", "transactionId"),
)
# Create indexes
op.create_index(
"idx_transaction_enrichments_category", "transaction_enrichments", ["category"]
)
op.create_index(
"idx_transaction_enrichments_clean_name",
"transaction_enrichments",
["clean_name"],
)
def downgrade() -> None:
"""Drop transaction_enrichments table."""
op.drop_table("transaction_enrichments")

View File

@@ -0,0 +1,33 @@
"""add_display_name_column
Add display_name column to accounts table.
Revision ID: be8d5807feca
Revises: 1ba02efe481c
Create Date: 2025-09-30 23:16:34.929968
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "be8d5807feca"
down_revision: Union[str, Sequence[str], None] = "1ba02efe481c"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Add display_name column to accounts table."""
with op.batch_alter_table("accounts", schema=None) as batch_op:
batch_op.add_column(sa.Column("display_name", sa.String(), nullable=True))
def downgrade() -> None:
"""Remove display_name column."""
with op.batch_alter_table("accounts", schema=None) as batch_op:
batch_op.drop_column("display_name")

View File

@@ -0,0 +1,62 @@
"""migrate_balance_timestamps
Convert Unix timestamps to datetime strings in balances table.
Revision ID: bf30246cb723
Revises: de8bfb1169d4
Create Date: 2025-09-30 23:14:03.128959
"""
from datetime import datetime
from typing import Sequence, Union
from sqlalchemy import text
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "bf30246cb723"
down_revision: Union[str, Sequence[str], None] = "de8bfb1169d4"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Convert all Unix timestamps to datetime strings."""
conn = op.get_bind()
# Get all balances with REAL timestamps
result = conn.execute(
text("""
SELECT id, timestamp
FROM balances
WHERE typeof(timestamp) = 'real'
ORDER BY id
""")
)
unix_records = result.fetchall()
if not unix_records:
return
# Convert and update in batches
for record_id, unix_timestamp in unix_records:
try:
# Convert Unix timestamp to datetime string
dt_string = datetime.fromtimestamp(float(unix_timestamp)).isoformat()
# Update the record
conn.execute(
text("UPDATE balances SET timestamp = :dt WHERE id = :id"),
{"dt": dt_string, "id": record_id},
)
except Exception:
continue
conn.commit()
def downgrade() -> None:
"""Not implemented - converting back would lose precision."""

View File

@@ -0,0 +1,33 @@
"""add_logo_column
Add logo column to accounts table.
Revision ID: dd9f6a55604c
Revises: f854fd498a6e
Create Date: 2025-09-30 23:16:35.530858
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "dd9f6a55604c"
down_revision: Union[str, Sequence[str], None] = "f854fd498a6e"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Add logo column to accounts table."""
with op.batch_alter_table("accounts", schema=None) as batch_op:
batch_op.add_column(sa.Column("logo", sa.String(), nullable=True))
def downgrade() -> None:
"""Remove logo column."""
with op.batch_alter_table("accounts", schema=None) as batch_op:
batch_op.drop_column("logo")

View File

@@ -0,0 +1,95 @@
"""create_initial_tables
Revision ID: de8bfb1169d4
Revises:
Create Date: 2025-09-30 23:09:24.255875
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "de8bfb1169d4"
down_revision: Union[str, Sequence[str], None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Create initial database tables."""
# Create accounts table
op.create_table(
"accounts",
sa.Column("id", sa.String(), nullable=False),
sa.Column("institution_id", sa.String(), nullable=False),
sa.Column("status", sa.String(), nullable=False),
sa.Column("iban", sa.String(), nullable=True),
sa.Column("name", sa.String(), nullable=True),
sa.Column("currency", sa.String(), nullable=True),
sa.Column("created", sa.DateTime(), nullable=False),
sa.Column("last_accessed", sa.DateTime(), nullable=True),
sa.Column("last_updated", sa.DateTime(), nullable=True),
sa.PrimaryKeyConstraint("id"),
)
op.create_index("idx_accounts_institution_id", "accounts", ["institution_id"])
op.create_index("idx_accounts_status", "accounts", ["status"])
# Create balances table
op.create_table(
"balances",
sa.Column("id", sa.Integer(), autoincrement=True, nullable=False),
sa.Column("account_id", sa.String(), nullable=False),
sa.Column("bank", sa.String(), nullable=False),
sa.Column("status", sa.String(), nullable=False),
sa.Column("iban", sa.String(), nullable=False),
sa.Column("amount", sa.Float(), nullable=False),
sa.Column("currency", sa.String(), nullable=False),
sa.Column("type", sa.String(), nullable=False),
sa.Column("timestamp", sa.DateTime(), nullable=False),
sa.PrimaryKeyConstraint("id"),
)
op.create_index("idx_balances_account_id", "balances", ["account_id"])
op.create_index("idx_balances_timestamp", "balances", ["timestamp"])
op.create_index(
"idx_balances_account_type_timestamp",
"balances",
["account_id", "type", "timestamp"],
)
# Create transactions table (old schema with internalTransactionId as PK)
op.create_table(
"transactions",
sa.Column("accountId", sa.String(), nullable=False),
sa.Column("transactionId", sa.String(), nullable=False),
sa.Column("internalTransactionId", sa.String(), nullable=True),
sa.Column("institutionId", sa.String(), nullable=False),
sa.Column("iban", sa.String(), nullable=True),
sa.Column("transactionDate", sa.DateTime(), nullable=True),
sa.Column("description", sa.String(), nullable=True),
sa.Column("transactionValue", sa.Float(), nullable=True),
sa.Column("transactionCurrency", sa.String(), nullable=True),
sa.Column("transactionStatus", sa.String(), nullable=True),
sa.Column("rawTransaction", sa.JSON(), nullable=False),
sa.PrimaryKeyConstraint("internalTransactionId"),
)
op.create_index(
"idx_transactions_internal_id", "transactions", ["internalTransactionId"]
)
op.create_index("idx_transactions_date", "transactions", ["transactionDate"])
op.create_index(
"idx_transactions_account_date",
"transactions",
["accountId", "transactionDate"],
)
op.create_index("idx_transactions_amount", "transactions", ["transactionValue"])
def downgrade() -> None:
"""Drop initial tables."""
op.drop_table("transactions")
op.drop_table("balances")
op.drop_table("accounts")

View File

@@ -0,0 +1,59 @@
"""add_sync_operations_table
Add sync_operations table for tracking synchronization operations.
Revision ID: f854fd498a6e
Revises: be8d5807feca
Create Date: 2025-09-30 23:16:35.229062
"""
from typing import Sequence, Union
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "f854fd498a6e"
down_revision: Union[str, Sequence[str], None] = "be8d5807feca"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Create sync_operations table."""
op.create_table(
"sync_operations",
sa.Column("id", sa.Integer(), autoincrement=True, nullable=False),
sa.Column("started_at", sa.DateTime(), nullable=False),
sa.Column("completed_at", sa.DateTime(), nullable=True),
sa.Column("success", sa.Boolean(), nullable=True),
sa.Column(
"accounts_processed", sa.Integer(), nullable=False, server_default="0"
),
sa.Column(
"transactions_added", sa.Integer(), nullable=False, server_default="0"
),
sa.Column(
"transactions_updated", sa.Integer(), nullable=False, server_default="0"
),
sa.Column("balances_updated", sa.Integer(), nullable=False, server_default="0"),
sa.Column("duration_seconds", sa.Float(), nullable=True),
sa.Column("errors", sa.String(), nullable=True),
sa.Column("logs", sa.String(), nullable=True),
sa.Column("trigger_type", sa.String(), nullable=False, server_default="manual"),
sa.PrimaryKeyConstraint("id"),
)
# Create indexes
op.create_index("idx_sync_operations_started_at", "sync_operations", ["started_at"])
op.create_index("idx_sync_operations_success", "sync_operations", ["success"])
op.create_index(
"idx_sync_operations_trigger_type", "sync_operations", ["trigger_type"]
)
def downgrade() -> None:
"""Drop sync_operations table."""
op.drop_table("sync_operations")