diff --git a/leggend/main.py b/leggend/main.py index 807d9fe..79f0619 100644 --- a/leggend/main.py +++ b/leggend/main.py @@ -24,6 +24,17 @@ async def lifespan(app: FastAPI): logger.error(f"Failed to load configuration: {e}") raise + # Run database migrations + try: + from leggend.services.database_service import DatabaseService + + db_service = DatabaseService() + await db_service.run_migrations_if_needed() + logger.info("Database migrations completed") + except Exception as e: + logger.error(f"Database migration failed: {e}") + raise + # Start background scheduler scheduler.start() logger.info("Background scheduler started") diff --git a/leggend/services/database_service.py b/leggend/services/database_service.py index 5cbc3b2..42c03f4 100644 --- a/leggend/services/database_service.py +++ b/leggend/services/database_service.py @@ -1,5 +1,6 @@ from datetime import datetime from typing import List, Dict, Any, Optional +import sqlite3 from loguru import logger @@ -251,6 +252,138 @@ class DatabaseService: logger.error(f"Failed to get account details from database: {e}") return None + async def run_migrations_if_needed(self): + """Run all necessary database migrations""" + if not self.sqlite_enabled: + logger.info("SQLite database disabled, skipping migrations") + return + + await self._migrate_balance_timestamps_if_needed() + + async def _migrate_balance_timestamps_if_needed(self): + """Check and migrate balance timestamps if needed""" + try: + if await self._check_balance_timestamp_migration_needed(): + logger.info("Balance timestamp migration needed, starting...") + await self._migrate_balance_timestamps() + logger.info("Balance timestamp migration completed") + else: + logger.info("Balance timestamps are already consistent") + except Exception as e: + logger.error(f"Balance timestamp migration failed: {e}") + raise + + async def _check_balance_timestamp_migration_needed(self) -> bool: + """Check if balance timestamps need migration""" + from pathlib import Path + + db_path = Path.home() / ".config" / "leggen" / "leggen.db" + if not db_path.exists(): + return False + + try: + conn = sqlite3.connect(str(db_path)) + cursor = conn.cursor() + + # Check for mixed timestamp types + cursor.execute(""" + SELECT typeof(timestamp) as type, COUNT(*) as count + FROM balances + GROUP BY typeof(timestamp) + """) + + types = cursor.fetchall() + conn.close() + + # If we have both 'real' and 'text' types, migration is needed + type_names = [row[0] for row in types] + return "real" in type_names and "text" in type_names + + except Exception as e: + logger.error(f"Failed to check migration status: {e}") + return False + + async def _migrate_balance_timestamps(self): + """Convert all Unix timestamps to datetime strings""" + from pathlib import Path + + db_path = Path.home() / ".config" / "leggen" / "leggen.db" + if not db_path.exists(): + logger.warning("Database file not found, skipping migration") + return + + try: + conn = sqlite3.connect(str(db_path)) + cursor = conn.cursor() + + # Get all balances with REAL timestamps + cursor.execute(""" + SELECT id, timestamp + FROM balances + WHERE typeof(timestamp) = 'real' + ORDER BY id + """) + + unix_records = cursor.fetchall() + total_records = len(unix_records) + + if total_records == 0: + logger.info("No Unix timestamps found to migrate") + conn.close() + return + + logger.info( + f"Migrating {total_records} balance records from Unix to datetime format" + ) + + # Convert and update in batches + batch_size = 100 + migrated_count = 0 + + for i in range(0, total_records, batch_size): + batch = unix_records[i : i + batch_size] + + for record_id, unix_timestamp in batch: + try: + # Convert Unix timestamp to datetime string + dt_string = self._unix_to_datetime_string(float(unix_timestamp)) + + # Update the record + cursor.execute( + """ + UPDATE balances + SET timestamp = ? + WHERE id = ? + """, + (dt_string, record_id), + ) + + migrated_count += 1 + + if migrated_count % 100 == 0: + logger.info( + f"Migrated {migrated_count}/{total_records} balance records" + ) + + except Exception as e: + logger.error(f"Failed to migrate record {record_id}: {e}") + continue + + # Commit batch + conn.commit() + + conn.close() + logger.info(f"Successfully migrated {migrated_count} balance records") + + except Exception as e: + logger.error(f"Balance timestamp migration failed: {e}") + raise + + def _unix_to_datetime_string(self, unix_timestamp: float) -> str: + """Convert Unix timestamp to datetime string""" + dt = datetime.fromtimestamp(unix_timestamp) + return dt.isoformat() + async def _persist_balance_sqlite( self, account_id: str, balance_data: Dict[str, Any] ) -> None: @@ -318,7 +451,7 @@ class DatabaseService: float(balance_amount["amount"]), balance_amount["currency"], balance["balanceType"], - datetime.now(), + datetime.now().isoformat(), ), ) except sqlite3.IntegrityError: