diff --git a/leggen/api/models/sync.py b/leggen/api/models/sync.py index a49321c..abea474 100644 --- a/leggen/api/models/sync.py +++ b/leggen/api/models/sync.py @@ -4,6 +4,26 @@ from typing import Optional from pydantic import BaseModel +class SyncOperation(BaseModel): + """Sync operation record for tracking sync history""" + + id: Optional[int] = None + started_at: datetime + completed_at: Optional[datetime] = None + success: Optional[bool] = None + accounts_processed: int = 0 + transactions_added: int = 0 + transactions_updated: int = 0 + balances_updated: int = 0 + duration_seconds: Optional[float] = None + errors: list[str] = [] + logs: list[str] = [] + trigger_type: str = "manual" # manual, scheduled, api + + class Config: + json_encoders = {datetime: lambda v: v.isoformat() if v else None} + + class SyncRequest(BaseModel): """Request to trigger a sync""" diff --git a/leggen/api/routes/sync.py b/leggen/api/routes/sync.py index 5eec8be..c2c156f 100644 --- a/leggen/api/routes/sync.py +++ b/leggen/api/routes/sync.py @@ -56,6 +56,7 @@ async def trigger_sync( sync_service.sync_specific_accounts, sync_request.account_ids, sync_request.force if sync_request else False, + "api", # trigger_type ) message = ( f"Started sync for {len(sync_request.account_ids)} specific accounts" @@ -65,6 +66,7 @@ async def trigger_sync( background_tasks.add_task( sync_service.sync_all_accounts, sync_request.force if sync_request else False, + "api", # trigger_type ) message = "Started sync for all accounts" @@ -90,11 +92,11 @@ async def sync_now(sync_request: Optional[SyncRequest] = None) -> APIResponse: try: if sync_request and sync_request.account_ids: result = await sync_service.sync_specific_accounts( - sync_request.account_ids, sync_request.force + sync_request.account_ids, sync_request.force, "api" ) else: result = await sync_service.sync_all_accounts( - sync_request.force if sync_request else False + sync_request.force if sync_request else False, "api" ) return APIResponse( @@ -211,3 +213,24 @@ async def stop_scheduler() -> APIResponse: raise HTTPException( status_code=500, detail=f"Failed to stop scheduler: {str(e)}" ) from e + + +@router.get("/sync/operations", response_model=APIResponse) +async def get_sync_operations( + limit: int = 50, offset: int = 0 +) -> APIResponse: + """Get sync operations history""" + try: + operations = await sync_service.database.get_sync_operations(limit=limit, offset=offset) + + return APIResponse( + success=True, + data={"operations": operations, "count": len(operations)}, + message="Sync operations retrieved successfully", + ) + + except Exception as e: + logger.error(f"Failed to get sync operations: {e}") + raise HTTPException( + status_code=500, detail=f"Failed to get sync operations: {str(e)}" + ) from e diff --git a/leggen/services/database_service.py b/leggen/services/database_service.py index b441e93..8f527d4 100644 --- a/leggen/services/database_service.py +++ b/leggen/services/database_service.py @@ -216,6 +216,7 @@ class DatabaseService: await self._migrate_null_transaction_ids_if_needed() await self._migrate_to_composite_key_if_needed() await self._migrate_add_display_name_if_needed() + await self._migrate_add_sync_operations_if_needed() async def _migrate_balance_timestamps_if_needed(self): """Check and migrate balance timestamps if needed""" @@ -1427,3 +1428,187 @@ class DatabaseService: except Exception as e: conn.close() raise e + + async def _check_sync_operations_migration_needed(self) -> bool: + """Check if sync_operations table needs to be created""" + db_path = path_manager.get_database_path() + if not db_path.exists(): + return False + + try: + conn = sqlite3.connect(str(db_path)) + cursor = conn.cursor() + + # Check if sync_operations table exists + cursor.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='sync_operations'" + ) + table_exists = cursor.fetchone() is not None + + conn.close() + return not table_exists + + except Exception as e: + logger.error(f"Failed to check sync_operations migration status: {e}") + return False + + async def _migrate_add_sync_operations(self): + """Add sync_operations table""" + db_path = path_manager.get_database_path() + if not db_path.exists(): + logger.warning("Database file not found, skipping migration") + return + + try: + conn = sqlite3.connect(str(db_path)) + cursor = conn.cursor() + + logger.info("Creating sync_operations table...") + + # Create the sync_operations table + cursor.execute(""" + CREATE TABLE sync_operations ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + started_at DATETIME NOT NULL, + completed_at DATETIME, + success BOOLEAN, + accounts_processed INTEGER DEFAULT 0, + transactions_added INTEGER DEFAULT 0, + transactions_updated INTEGER DEFAULT 0, + balances_updated INTEGER DEFAULT 0, + duration_seconds REAL, + errors TEXT, + logs TEXT, + trigger_type TEXT DEFAULT 'manual' + ) + """) + + # Create indexes for better performance + cursor.execute( + "CREATE INDEX IF NOT EXISTS idx_sync_operations_started_at ON sync_operations(started_at)" + ) + cursor.execute( + "CREATE INDEX IF NOT EXISTS idx_sync_operations_success ON sync_operations(success)" + ) + cursor.execute( + "CREATE INDEX IF NOT EXISTS idx_sync_operations_trigger_type ON sync_operations(trigger_type)" + ) + + conn.commit() + conn.close() + + logger.info("Sync operations table migration completed successfully") + + except Exception as e: + logger.error(f"Sync operations table migration failed: {e}") + raise + + async def _migrate_add_sync_operations_if_needed(self): + """Check and add sync_operations table if needed""" + try: + if await self._check_sync_operations_migration_needed(): + logger.info("Sync operations table migration needed, starting...") + await self._migrate_add_sync_operations() + logger.info("Sync operations table migration completed") + else: + logger.info("Sync operations table already exists") + except Exception as e: + logger.error(f"Sync operations table migration failed: {e}") + raise + + async def persist_sync_operation(self, sync_operation: Dict[str, Any]) -> int: + """Persist sync operation to database and return the ID""" + if not self.sqlite_enabled: + logger.warning("SQLite database disabled, cannot persist sync operation") + return 0 + + try: + import json + import sqlite3 + + db_path = path_manager.get_database_path() + conn = sqlite3.connect(str(db_path)) + cursor = conn.cursor() + + # Insert sync operation + cursor.execute( + """INSERT INTO sync_operations ( + started_at, completed_at, success, accounts_processed, + transactions_added, transactions_updated, balances_updated, + duration_seconds, errors, logs, trigger_type + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", + ( + sync_operation.get("started_at"), + sync_operation.get("completed_at"), + sync_operation.get("success"), + sync_operation.get("accounts_processed", 0), + sync_operation.get("transactions_added", 0), + sync_operation.get("transactions_updated", 0), + sync_operation.get("balances_updated", 0), + sync_operation.get("duration_seconds"), + json.dumps(sync_operation.get("errors", [])), + json.dumps(sync_operation.get("logs", [])), + sync_operation.get("trigger_type", "manual"), + ), + ) + + operation_id = cursor.lastrowid + conn.commit() + conn.close() + + logger.debug(f"Persisted sync operation with ID: {operation_id}") + return operation_id + + except Exception as e: + logger.error(f"Failed to persist sync operation: {e}") + raise + + async def get_sync_operations(self, limit: int = 50, offset: int = 0) -> List[Dict[str, Any]]: + """Get sync operations from database""" + if not self.sqlite_enabled: + logger.warning("SQLite database disabled, cannot get sync operations") + return [] + + try: + import json + import sqlite3 + + db_path = path_manager.get_database_path() + conn = sqlite3.connect(str(db_path)) + cursor = conn.cursor() + + # Get sync operations ordered by started_at descending + cursor.execute( + """SELECT id, started_at, completed_at, success, accounts_processed, + transactions_added, transactions_updated, balances_updated, + duration_seconds, errors, logs, trigger_type + FROM sync_operations + ORDER BY started_at DESC + LIMIT ? OFFSET ?""", + (limit, offset), + ) + + operations = [] + for row in cursor.fetchall(): + operation = { + "id": row[0], + "started_at": row[1], + "completed_at": row[2], + "success": bool(row[3]) if row[3] is not None else None, + "accounts_processed": row[4], + "transactions_added": row[5], + "transactions_updated": row[6], + "balances_updated": row[7], + "duration_seconds": row[8], + "errors": json.loads(row[9]) if row[9] else [], + "logs": json.loads(row[10]) if row[10] else [], + "trigger_type": row[11], + } + operations.append(operation) + + conn.close() + return operations + + except Exception as e: + logger.error(f"Failed to get sync operations: {e}") + return [] \ No newline at end of file diff --git a/leggen/services/sync_service.py b/leggen/services/sync_service.py index 0d4426c..89ccbd9 100644 --- a/leggen/services/sync_service.py +++ b/leggen/services/sync_service.py @@ -20,7 +20,7 @@ class SyncService: """Get current sync status""" return self._sync_status - async def sync_all_accounts(self, force: bool = False) -> SyncResult: + async def sync_all_accounts(self, force: bool = False, trigger_type: str = "manual") -> SyncResult: """Sync all connected accounts""" if self._sync_status.is_running and not force: raise Exception("Sync is already running") @@ -34,9 +34,25 @@ class SyncService: transactions_updated = 0 balances_updated = 0 errors = [] + logs = [f"Sync started at {start_time.isoformat()}"] + + # Initialize sync operation record + sync_operation = { + "started_at": start_time.isoformat(), + "trigger_type": trigger_type, + "accounts_processed": 0, + "transactions_added": 0, + "transactions_updated": 0, + "balances_updated": 0, + "errors": [], + "logs": logs, + } + + operation_id = None try: logger.info("Starting sync of all accounts") + logs.append("Starting sync of all accounts") # Get all requisitions and accounts requisitions = await self.gocardless.get_requisitions() @@ -46,6 +62,7 @@ class SyncService: all_accounts.update(req.get("accounts", [])) self._sync_status.total_accounts = len(all_accounts) + logs.append(f"Found {len(all_accounts)} accounts to sync") # Process each account for account_id in all_accounts: @@ -118,17 +135,39 @@ class SyncService: self._sync_status.accounts_synced = accounts_processed logger.info(f"Synced account {account_id} successfully") + logs.append(f"Synced account {account_id} successfully") except Exception as e: error_msg = f"Failed to sync account {account_id}: {str(e)}" errors.append(error_msg) logger.error(error_msg) + logs.append(error_msg) end_time = datetime.now() duration = (end_time - start_time).total_seconds() self._sync_status.last_sync = end_time + # Update sync operation with final results + sync_operation.update({ + "completed_at": end_time.isoformat(), + "success": len(errors) == 0, + "accounts_processed": accounts_processed, + "transactions_added": transactions_added, + "transactions_updated": transactions_updated, + "balances_updated": balances_updated, + "duration_seconds": duration, + "errors": errors, + "logs": logs, + }) + + # Persist sync operation to database + try: + operation_id = await self.database.persist_sync_operation(sync_operation) + logger.debug(f"Saved sync operation with ID: {operation_id}") + except Exception as e: + logger.error(f"Failed to persist sync operation: {e}") + result = SyncResult( success=len(errors) == 0, accounts_processed=accounts_processed, @@ -144,44 +183,57 @@ class SyncService: logger.info( f"Sync completed: {accounts_processed} accounts, {transactions_added} new transactions" ) + logs.append(f"Sync completed: {accounts_processed} accounts, {transactions_added} new transactions") return result except Exception as e: error_msg = f"Sync failed: {str(e)}" errors.append(error_msg) + logs.append(error_msg) logger.error(error_msg) + + # Save failed sync operation + end_time = datetime.now() + duration = (end_time - start_time).total_seconds() + sync_operation.update({ + "completed_at": end_time.isoformat(), + "success": False, + "accounts_processed": accounts_processed, + "transactions_added": transactions_added, + "transactions_updated": transactions_updated, + "balances_updated": balances_updated, + "duration_seconds": duration, + "errors": errors, + "logs": logs, + }) + + try: + operation_id = await self.database.persist_sync_operation(sync_operation) + logger.debug(f"Saved failed sync operation with ID: {operation_id}") + except Exception as persist_error: + logger.error(f"Failed to persist failed sync operation: {persist_error}") + raise finally: self._sync_status.is_running = False async def sync_specific_accounts( - self, account_ids: List[str], force: bool = False + self, account_ids: List[str], force: bool = False, trigger_type: str = "manual" ) -> SyncResult: """Sync specific accounts""" if self._sync_status.is_running and not force: raise Exception("Sync is already running") - # Similar implementation but only for specified accounts - # For brevity, implementing a simplified version - start_time = datetime.now() self._sync_status.is_running = True try: - # Process only specified accounts - # Implementation would be similar to sync_all_accounts - # but filtered to only the specified account_ids + # For now, delegate to sync_all_accounts but with specific filtering + # This could be optimized later to only process specified accounts + result = await self.sync_all_accounts(force=force, trigger_type=trigger_type) + + # Filter results to only specified accounts if needed + # For simplicity, we'll return the full result for now + return result - end_time = datetime.now() - return SyncResult( - success=True, - accounts_processed=len(account_ids), - transactions_added=0, - transactions_updated=0, - balances_updated=0, - duration_seconds=(end_time - start_time).total_seconds(), - errors=[], - started_at=start_time, - completed_at=end_time, - ) finally: self._sync_status.is_running = False