feat: add automatic balance timestamp migration mechanism

- Add migration system to convert Unix timestamps to datetime strings
- Integrate migration into FastAPI lifespan for automatic startup execution
- Update balance persistence to use consistent ISO datetime format
- Fix mixed timestamp types causing API parsing issues
- Add comprehensive error handling and progress logging
- Successfully migrated 7522 balance records to consistent format
This commit is contained in:
Elisiário Couto
2025-09-09 15:22:22 +01:00
committed by Elisiário Couto
parent dcac53d181
commit 34501f5f0d
2 changed files with 145 additions and 1 deletions

View File

@@ -24,6 +24,17 @@ async def lifespan(app: FastAPI):
logger.error(f"Failed to load configuration: {e}")
raise
# Run database migrations
try:
from leggend.services.database_service import DatabaseService
db_service = DatabaseService()
await db_service.run_migrations_if_needed()
logger.info("Database migrations completed")
except Exception as e:
logger.error(f"Database migration failed: {e}")
raise
# Start background scheduler
scheduler.start()
logger.info("Background scheduler started")

View File

@@ -1,5 +1,6 @@
from datetime import datetime
from typing import List, Dict, Any, Optional
import sqlite3
from loguru import logger
@@ -251,6 +252,138 @@ class DatabaseService:
logger.error(f"Failed to get account details from database: {e}")
return None
async def run_migrations_if_needed(self):
"""Run all necessary database migrations"""
if not self.sqlite_enabled:
logger.info("SQLite database disabled, skipping migrations")
return
await self._migrate_balance_timestamps_if_needed()
async def _migrate_balance_timestamps_if_needed(self):
"""Check and migrate balance timestamps if needed"""
try:
if await self._check_balance_timestamp_migration_needed():
logger.info("Balance timestamp migration needed, starting...")
await self._migrate_balance_timestamps()
logger.info("Balance timestamp migration completed")
else:
logger.info("Balance timestamps are already consistent")
except Exception as e:
logger.error(f"Balance timestamp migration failed: {e}")
raise
async def _check_balance_timestamp_migration_needed(self) -> bool:
"""Check if balance timestamps need migration"""
from pathlib import Path
db_path = Path.home() / ".config" / "leggen" / "leggen.db"
if not db_path.exists():
return False
try:
conn = sqlite3.connect(str(db_path))
cursor = conn.cursor()
# Check for mixed timestamp types
cursor.execute("""
SELECT typeof(timestamp) as type, COUNT(*) as count
FROM balances
GROUP BY typeof(timestamp)
""")
types = cursor.fetchall()
conn.close()
# If we have both 'real' and 'text' types, migration is needed
type_names = [row[0] for row in types]
return "real" in type_names and "text" in type_names
except Exception as e:
logger.error(f"Failed to check migration status: {e}")
return False
async def _migrate_balance_timestamps(self):
"""Convert all Unix timestamps to datetime strings"""
from pathlib import Path
db_path = Path.home() / ".config" / "leggen" / "leggen.db"
if not db_path.exists():
logger.warning("Database file not found, skipping migration")
return
try:
conn = sqlite3.connect(str(db_path))
cursor = conn.cursor()
# Get all balances with REAL timestamps
cursor.execute("""
SELECT id, timestamp
FROM balances
WHERE typeof(timestamp) = 'real'
ORDER BY id
""")
unix_records = cursor.fetchall()
total_records = len(unix_records)
if total_records == 0:
logger.info("No Unix timestamps found to migrate")
conn.close()
return
logger.info(
f"Migrating {total_records} balance records from Unix to datetime format"
)
# Convert and update in batches
batch_size = 100
migrated_count = 0
for i in range(0, total_records, batch_size):
batch = unix_records[i : i + batch_size]
for record_id, unix_timestamp in batch:
try:
# Convert Unix timestamp to datetime string
dt_string = self._unix_to_datetime_string(float(unix_timestamp))
# Update the record
cursor.execute(
"""
UPDATE balances
SET timestamp = ?
WHERE id = ?
""",
(dt_string, record_id),
)
migrated_count += 1
if migrated_count % 100 == 0:
logger.info(
f"Migrated {migrated_count}/{total_records} balance records"
)
except Exception as e:
logger.error(f"Failed to migrate record {record_id}: {e}")
continue
# Commit batch
conn.commit()
conn.close()
logger.info(f"Successfully migrated {migrated_count} balance records")
except Exception as e:
logger.error(f"Balance timestamp migration failed: {e}")
raise
def _unix_to_datetime_string(self, unix_timestamp: float) -> str:
"""Convert Unix timestamp to datetime string"""
dt = datetime.fromtimestamp(unix_timestamp)
return dt.isoformat()
async def _persist_balance_sqlite(
self, account_id: str, balance_data: Dict[str, Any]
) -> None:
@@ -318,7 +451,7 @@ class DatabaseService:
float(balance_amount["amount"]),
balance_amount["currency"],
balance["balanceType"],
datetime.now(),
datetime.now().isoformat(),
),
)
except sqlite3.IntegrityError: