mirror of
https://github.com/elisiariocouto/leggen.git
synced 2025-12-14 02:42:21 +00:00
feat(api): Add sync operations tracking and database storage
Co-authored-by: elisiariocouto <818914+elisiariocouto@users.noreply.github.com>
This commit is contained in:
committed by
Elisiário Couto
parent
76a30d23af
commit
61f9592095
@@ -216,6 +216,7 @@ class DatabaseService:
|
||||
await self._migrate_null_transaction_ids_if_needed()
|
||||
await self._migrate_to_composite_key_if_needed()
|
||||
await self._migrate_add_display_name_if_needed()
|
||||
await self._migrate_add_sync_operations_if_needed()
|
||||
|
||||
async def _migrate_balance_timestamps_if_needed(self):
|
||||
"""Check and migrate balance timestamps if needed"""
|
||||
@@ -1427,3 +1428,187 @@ class DatabaseService:
|
||||
except Exception as e:
|
||||
conn.close()
|
||||
raise e
|
||||
|
||||
async def _check_sync_operations_migration_needed(self) -> bool:
|
||||
"""Check if sync_operations table needs to be created"""
|
||||
db_path = path_manager.get_database_path()
|
||||
if not db_path.exists():
|
||||
return False
|
||||
|
||||
try:
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Check if sync_operations table exists
|
||||
cursor.execute(
|
||||
"SELECT name FROM sqlite_master WHERE type='table' AND name='sync_operations'"
|
||||
)
|
||||
table_exists = cursor.fetchone() is not None
|
||||
|
||||
conn.close()
|
||||
return not table_exists
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to check sync_operations migration status: {e}")
|
||||
return False
|
||||
|
||||
async def _migrate_add_sync_operations(self):
|
||||
"""Add sync_operations table"""
|
||||
db_path = path_manager.get_database_path()
|
||||
if not db_path.exists():
|
||||
logger.warning("Database file not found, skipping migration")
|
||||
return
|
||||
|
||||
try:
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
cursor = conn.cursor()
|
||||
|
||||
logger.info("Creating sync_operations table...")
|
||||
|
||||
# Create the sync_operations table
|
||||
cursor.execute("""
|
||||
CREATE TABLE sync_operations (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
started_at DATETIME NOT NULL,
|
||||
completed_at DATETIME,
|
||||
success BOOLEAN,
|
||||
accounts_processed INTEGER DEFAULT 0,
|
||||
transactions_added INTEGER DEFAULT 0,
|
||||
transactions_updated INTEGER DEFAULT 0,
|
||||
balances_updated INTEGER DEFAULT 0,
|
||||
duration_seconds REAL,
|
||||
errors TEXT,
|
||||
logs TEXT,
|
||||
trigger_type TEXT DEFAULT 'manual'
|
||||
)
|
||||
""")
|
||||
|
||||
# Create indexes for better performance
|
||||
cursor.execute(
|
||||
"CREATE INDEX IF NOT EXISTS idx_sync_operations_started_at ON sync_operations(started_at)"
|
||||
)
|
||||
cursor.execute(
|
||||
"CREATE INDEX IF NOT EXISTS idx_sync_operations_success ON sync_operations(success)"
|
||||
)
|
||||
cursor.execute(
|
||||
"CREATE INDEX IF NOT EXISTS idx_sync_operations_trigger_type ON sync_operations(trigger_type)"
|
||||
)
|
||||
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
logger.info("Sync operations table migration completed successfully")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Sync operations table migration failed: {e}")
|
||||
raise
|
||||
|
||||
async def _migrate_add_sync_operations_if_needed(self):
|
||||
"""Check and add sync_operations table if needed"""
|
||||
try:
|
||||
if await self._check_sync_operations_migration_needed():
|
||||
logger.info("Sync operations table migration needed, starting...")
|
||||
await self._migrate_add_sync_operations()
|
||||
logger.info("Sync operations table migration completed")
|
||||
else:
|
||||
logger.info("Sync operations table already exists")
|
||||
except Exception as e:
|
||||
logger.error(f"Sync operations table migration failed: {e}")
|
||||
raise
|
||||
|
||||
async def persist_sync_operation(self, sync_operation: Dict[str, Any]) -> int:
|
||||
"""Persist sync operation to database and return the ID"""
|
||||
if not self.sqlite_enabled:
|
||||
logger.warning("SQLite database disabled, cannot persist sync operation")
|
||||
return 0
|
||||
|
||||
try:
|
||||
import json
|
||||
import sqlite3
|
||||
|
||||
db_path = path_manager.get_database_path()
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Insert sync operation
|
||||
cursor.execute(
|
||||
"""INSERT INTO sync_operations (
|
||||
started_at, completed_at, success, accounts_processed,
|
||||
transactions_added, transactions_updated, balances_updated,
|
||||
duration_seconds, errors, logs, trigger_type
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
||||
(
|
||||
sync_operation.get("started_at"),
|
||||
sync_operation.get("completed_at"),
|
||||
sync_operation.get("success"),
|
||||
sync_operation.get("accounts_processed", 0),
|
||||
sync_operation.get("transactions_added", 0),
|
||||
sync_operation.get("transactions_updated", 0),
|
||||
sync_operation.get("balances_updated", 0),
|
||||
sync_operation.get("duration_seconds"),
|
||||
json.dumps(sync_operation.get("errors", [])),
|
||||
json.dumps(sync_operation.get("logs", [])),
|
||||
sync_operation.get("trigger_type", "manual"),
|
||||
),
|
||||
)
|
||||
|
||||
operation_id = cursor.lastrowid
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
logger.debug(f"Persisted sync operation with ID: {operation_id}")
|
||||
return operation_id
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to persist sync operation: {e}")
|
||||
raise
|
||||
|
||||
async def get_sync_operations(self, limit: int = 50, offset: int = 0) -> List[Dict[str, Any]]:
|
||||
"""Get sync operations from database"""
|
||||
if not self.sqlite_enabled:
|
||||
logger.warning("SQLite database disabled, cannot get sync operations")
|
||||
return []
|
||||
|
||||
try:
|
||||
import json
|
||||
import sqlite3
|
||||
|
||||
db_path = path_manager.get_database_path()
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Get sync operations ordered by started_at descending
|
||||
cursor.execute(
|
||||
"""SELECT id, started_at, completed_at, success, accounts_processed,
|
||||
transactions_added, transactions_updated, balances_updated,
|
||||
duration_seconds, errors, logs, trigger_type
|
||||
FROM sync_operations
|
||||
ORDER BY started_at DESC
|
||||
LIMIT ? OFFSET ?""",
|
||||
(limit, offset),
|
||||
)
|
||||
|
||||
operations = []
|
||||
for row in cursor.fetchall():
|
||||
operation = {
|
||||
"id": row[0],
|
||||
"started_at": row[1],
|
||||
"completed_at": row[2],
|
||||
"success": bool(row[3]) if row[3] is not None else None,
|
||||
"accounts_processed": row[4],
|
||||
"transactions_added": row[5],
|
||||
"transactions_updated": row[6],
|
||||
"balances_updated": row[7],
|
||||
"duration_seconds": row[8],
|
||||
"errors": json.loads(row[9]) if row[9] else [],
|
||||
"logs": json.loads(row[10]) if row[10] else [],
|
||||
"trigger_type": row[11],
|
||||
}
|
||||
operations.append(operation)
|
||||
|
||||
conn.close()
|
||||
return operations
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get sync operations: {e}")
|
||||
return []
|
||||
Reference in New Issue
Block a user