mirror of
https://github.com/elisiariocouto/leggen.git
synced 2025-12-29 03:59:07 +00:00
Compare commits
2 Commits
ecdf9f8ba3
...
59e5bc13d6
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
59e5bc13d6 | ||
|
|
b63d30c7a5 |
@@ -3,7 +3,7 @@ import { Link, useLocation } from "@tanstack/react-router";
|
||||
import {
|
||||
List,
|
||||
BarChart3,
|
||||
Bell,
|
||||
Activity,
|
||||
Settings,
|
||||
Building2,
|
||||
TrendingUp,
|
||||
@@ -33,7 +33,7 @@ import {
|
||||
const navigation = [
|
||||
{ name: "Overview", icon: List, to: "/" },
|
||||
{ name: "Analytics", icon: BarChart3, to: "/analytics" },
|
||||
{ name: "Notifications", icon: Bell, to: "/notifications" },
|
||||
{ name: "System Status", icon: Activity, to: "/notifications" },
|
||||
{ name: "Settings", icon: Settings, to: "/settings" },
|
||||
];
|
||||
|
||||
|
||||
@@ -10,6 +10,10 @@ import {
|
||||
CheckCircle,
|
||||
Settings,
|
||||
TestTube,
|
||||
Activity,
|
||||
Clock,
|
||||
TrendingUp,
|
||||
User,
|
||||
} from "lucide-react";
|
||||
import { apiClient } from "../lib/api";
|
||||
import NotificationsSkeleton from "./NotificationsSkeleton";
|
||||
@@ -32,7 +36,7 @@ import {
|
||||
SelectTrigger,
|
||||
SelectValue,
|
||||
} from "./ui/select";
|
||||
import type { NotificationSettings, NotificationService } from "../types/api";
|
||||
import type { NotificationSettings, NotificationService, SyncOperationsResponse } from "../types/api";
|
||||
|
||||
export default function Notifications() {
|
||||
const [testService, setTestService] = useState("");
|
||||
@@ -61,6 +65,16 @@ export default function Notifications() {
|
||||
queryFn: apiClient.getNotificationServices,
|
||||
});
|
||||
|
||||
const {
|
||||
data: syncOperations,
|
||||
isLoading: syncOperationsLoading,
|
||||
error: syncOperationsError,
|
||||
refetch: refetchSyncOperations,
|
||||
} = useQuery<SyncOperationsResponse>({
|
||||
queryKey: ["syncOperations"],
|
||||
queryFn: () => apiClient.getSyncOperations(10, 0), // Get latest 10 operations
|
||||
});
|
||||
|
||||
const testMutation = useMutation({
|
||||
mutationFn: apiClient.testNotification,
|
||||
onSuccess: () => {
|
||||
@@ -80,15 +94,15 @@ export default function Notifications() {
|
||||
},
|
||||
});
|
||||
|
||||
if (settingsLoading || servicesLoading) {
|
||||
if (settingsLoading || servicesLoading || syncOperationsLoading) {
|
||||
return <NotificationsSkeleton />;
|
||||
}
|
||||
|
||||
if (settingsError || servicesError) {
|
||||
if (settingsError || servicesError || syncOperationsError) {
|
||||
return (
|
||||
<Alert variant="destructive">
|
||||
<AlertCircle className="h-4 w-4" />
|
||||
<AlertTitle>Failed to load notifications</AlertTitle>
|
||||
<AlertTitle>Failed to load system data</AlertTitle>
|
||||
<AlertDescription className="space-y-3">
|
||||
<p>
|
||||
Unable to connect to the Leggen API. Please check your configuration
|
||||
@@ -98,6 +112,7 @@ export default function Notifications() {
|
||||
onClick={() => {
|
||||
refetchSettings();
|
||||
refetchServices();
|
||||
refetchSyncOperations();
|
||||
}}
|
||||
variant="outline"
|
||||
size="sm"
|
||||
@@ -131,6 +146,100 @@ export default function Notifications() {
|
||||
|
||||
return (
|
||||
<div className="space-y-6">
|
||||
{/* Sync Operations Section */}
|
||||
<Card>
|
||||
<CardHeader>
|
||||
<CardTitle className="flex items-center space-x-2">
|
||||
<Activity className="h-5 w-5 text-primary" />
|
||||
<span>Sync Operations</span>
|
||||
</CardTitle>
|
||||
<CardDescription>Recent synchronization activities</CardDescription>
|
||||
</CardHeader>
|
||||
<CardContent>
|
||||
{!syncOperations || syncOperations.operations.length === 0 ? (
|
||||
<div className="text-center py-6">
|
||||
<Activity className="h-12 w-12 text-muted-foreground mx-auto mb-4" />
|
||||
<h3 className="text-lg font-medium text-foreground mb-2">
|
||||
No sync operations yet
|
||||
</h3>
|
||||
<p className="text-muted-foreground">
|
||||
Sync operations will appear here once you start syncing your accounts.
|
||||
</p>
|
||||
</div>
|
||||
) : (
|
||||
<div className="space-y-4">
|
||||
{syncOperations.operations.slice(0, 5).map((operation) => {
|
||||
const startedAt = new Date(operation.started_at);
|
||||
const isRunning = !operation.completed_at;
|
||||
const duration = operation.duration_seconds
|
||||
? `${Math.round(operation.duration_seconds)}s`
|
||||
: '';
|
||||
|
||||
return (
|
||||
<div
|
||||
key={operation.id}
|
||||
className="flex items-center justify-between p-4 border rounded-lg hover:bg-accent transition-colors"
|
||||
>
|
||||
<div className="flex items-center space-x-4">
|
||||
<div className={`p-2 rounded-full ${
|
||||
isRunning
|
||||
? 'bg-blue-100 text-blue-600'
|
||||
: operation.success
|
||||
? 'bg-green-100 text-green-600'
|
||||
: 'bg-red-100 text-red-600'
|
||||
}`}>
|
||||
{isRunning ? (
|
||||
<RefreshCw className="h-4 w-4 animate-spin" />
|
||||
) : operation.success ? (
|
||||
<CheckCircle className="h-4 w-4" />
|
||||
) : (
|
||||
<AlertCircle className="h-4 w-4" />
|
||||
)}
|
||||
</div>
|
||||
<div>
|
||||
<div className="flex items-center space-x-2">
|
||||
<h4 className="text-sm font-medium text-foreground">
|
||||
{isRunning ? 'Sync Running' : operation.success ? 'Sync Completed' : 'Sync Failed'}
|
||||
</h4>
|
||||
<Badge variant="outline" className="text-xs">
|
||||
{operation.trigger_type}
|
||||
</Badge>
|
||||
</div>
|
||||
<div className="flex items-center space-x-4 mt-1 text-xs text-muted-foreground">
|
||||
<span className="flex items-center space-x-1">
|
||||
<Clock className="h-3 w-3" />
|
||||
<span>{startedAt.toLocaleDateString()} {startedAt.toLocaleTimeString()}</span>
|
||||
</span>
|
||||
{duration && (
|
||||
<span>Duration: {duration}</span>
|
||||
)}
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
<div className="text-right text-sm text-muted-foreground">
|
||||
<div className="flex items-center space-x-2">
|
||||
<User className="h-3 w-3" />
|
||||
<span>{operation.accounts_processed} accounts</span>
|
||||
</div>
|
||||
<div className="flex items-center space-x-2 mt-1">
|
||||
<TrendingUp className="h-3 w-3" />
|
||||
<span>{operation.transactions_added} new transactions</span>
|
||||
</div>
|
||||
{operation.errors.length > 0 && (
|
||||
<div className="flex items-center space-x-2 mt-1 text-red-600">
|
||||
<AlertCircle className="h-3 w-3" />
|
||||
<span>{operation.errors.length} errors</span>
|
||||
</div>
|
||||
)}
|
||||
</div>
|
||||
</div>
|
||||
);
|
||||
})}
|
||||
</div>
|
||||
)}
|
||||
</CardContent>
|
||||
</Card>
|
||||
|
||||
{/* Test Notification Section */}
|
||||
<Card>
|
||||
<CardHeader>
|
||||
|
||||
@@ -12,6 +12,7 @@ import type {
|
||||
HealthData,
|
||||
AccountUpdate,
|
||||
TransactionStats,
|
||||
SyncOperationsResponse,
|
||||
} from "../types/api";
|
||||
|
||||
// Use VITE_API_URL for development, relative URLs for production
|
||||
@@ -217,6 +218,17 @@ export const apiClient = {
|
||||
>(`/transactions/monthly-stats?${queryParams.toString()}`);
|
||||
return response.data.data;
|
||||
},
|
||||
|
||||
// Get sync operations history
|
||||
getSyncOperations: async (
|
||||
limit: number = 50,
|
||||
offset: number = 0,
|
||||
): Promise<SyncOperationsResponse> => {
|
||||
const response = await api.get<ApiResponse<SyncOperationsResponse>>(
|
||||
`/sync/operations?limit=${limit}&offset=${offset}`,
|
||||
);
|
||||
return response.data.data;
|
||||
},
|
||||
};
|
||||
|
||||
export default apiClient;
|
||||
|
||||
@@ -213,3 +213,24 @@ export interface TransactionStats {
|
||||
average_transaction: number;
|
||||
accounts_included: number;
|
||||
}
|
||||
|
||||
// Sync operations types
|
||||
export interface SyncOperation {
|
||||
id: number;
|
||||
started_at: string;
|
||||
completed_at?: string;
|
||||
success?: boolean;
|
||||
accounts_processed: number;
|
||||
transactions_added: number;
|
||||
transactions_updated: number;
|
||||
balances_updated: number;
|
||||
duration_seconds?: number;
|
||||
errors: string[];
|
||||
logs: string[];
|
||||
trigger_type: 'manual' | 'scheduled' | 'api';
|
||||
}
|
||||
|
||||
export interface SyncOperationsResponse {
|
||||
operations: SyncOperation[];
|
||||
count: number;
|
||||
}
|
||||
|
||||
@@ -4,6 +4,26 @@ from typing import Optional
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class SyncOperation(BaseModel):
|
||||
"""Sync operation record for tracking sync history"""
|
||||
|
||||
id: Optional[int] = None
|
||||
started_at: datetime
|
||||
completed_at: Optional[datetime] = None
|
||||
success: Optional[bool] = None
|
||||
accounts_processed: int = 0
|
||||
transactions_added: int = 0
|
||||
transactions_updated: int = 0
|
||||
balances_updated: int = 0
|
||||
duration_seconds: Optional[float] = None
|
||||
errors: list[str] = []
|
||||
logs: list[str] = []
|
||||
trigger_type: str = "manual" # manual, scheduled, api
|
||||
|
||||
class Config:
|
||||
json_encoders = {datetime: lambda v: v.isoformat() if v else None}
|
||||
|
||||
|
||||
class SyncRequest(BaseModel):
|
||||
"""Request to trigger a sync"""
|
||||
|
||||
|
||||
@@ -56,6 +56,7 @@ async def trigger_sync(
|
||||
sync_service.sync_specific_accounts,
|
||||
sync_request.account_ids,
|
||||
sync_request.force if sync_request else False,
|
||||
"api", # trigger_type
|
||||
)
|
||||
message = (
|
||||
f"Started sync for {len(sync_request.account_ids)} specific accounts"
|
||||
@@ -65,6 +66,7 @@ async def trigger_sync(
|
||||
background_tasks.add_task(
|
||||
sync_service.sync_all_accounts,
|
||||
sync_request.force if sync_request else False,
|
||||
"api", # trigger_type
|
||||
)
|
||||
message = "Started sync for all accounts"
|
||||
|
||||
@@ -90,11 +92,11 @@ async def sync_now(sync_request: Optional[SyncRequest] = None) -> APIResponse:
|
||||
try:
|
||||
if sync_request and sync_request.account_ids:
|
||||
result = await sync_service.sync_specific_accounts(
|
||||
sync_request.account_ids, sync_request.force
|
||||
sync_request.account_ids, sync_request.force, "api"
|
||||
)
|
||||
else:
|
||||
result = await sync_service.sync_all_accounts(
|
||||
sync_request.force if sync_request else False
|
||||
sync_request.force if sync_request else False, "api"
|
||||
)
|
||||
|
||||
return APIResponse(
|
||||
@@ -211,3 +213,24 @@ async def stop_scheduler() -> APIResponse:
|
||||
raise HTTPException(
|
||||
status_code=500, detail=f"Failed to stop scheduler: {str(e)}"
|
||||
) from e
|
||||
|
||||
|
||||
@router.get("/sync/operations", response_model=APIResponse)
|
||||
async def get_sync_operations(
|
||||
limit: int = 50, offset: int = 0
|
||||
) -> APIResponse:
|
||||
"""Get sync operations history"""
|
||||
try:
|
||||
operations = await sync_service.database.get_sync_operations(limit=limit, offset=offset)
|
||||
|
||||
return APIResponse(
|
||||
success=True,
|
||||
data={"operations": operations, "count": len(operations)},
|
||||
message="Sync operations retrieved successfully",
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get sync operations: {e}")
|
||||
raise HTTPException(
|
||||
status_code=500, detail=f"Failed to get sync operations: {str(e)}"
|
||||
) from e
|
||||
|
||||
@@ -216,6 +216,7 @@ class DatabaseService:
|
||||
await self._migrate_null_transaction_ids_if_needed()
|
||||
await self._migrate_to_composite_key_if_needed()
|
||||
await self._migrate_add_display_name_if_needed()
|
||||
await self._migrate_add_sync_operations_if_needed()
|
||||
|
||||
async def _migrate_balance_timestamps_if_needed(self):
|
||||
"""Check and migrate balance timestamps if needed"""
|
||||
@@ -1427,3 +1428,187 @@ class DatabaseService:
|
||||
except Exception as e:
|
||||
conn.close()
|
||||
raise e
|
||||
|
||||
async def _check_sync_operations_migration_needed(self) -> bool:
|
||||
"""Check if sync_operations table needs to be created"""
|
||||
db_path = path_manager.get_database_path()
|
||||
if not db_path.exists():
|
||||
return False
|
||||
|
||||
try:
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Check if sync_operations table exists
|
||||
cursor.execute(
|
||||
"SELECT name FROM sqlite_master WHERE type='table' AND name='sync_operations'"
|
||||
)
|
||||
table_exists = cursor.fetchone() is not None
|
||||
|
||||
conn.close()
|
||||
return not table_exists
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to check sync_operations migration status: {e}")
|
||||
return False
|
||||
|
||||
async def _migrate_add_sync_operations(self):
|
||||
"""Add sync_operations table"""
|
||||
db_path = path_manager.get_database_path()
|
||||
if not db_path.exists():
|
||||
logger.warning("Database file not found, skipping migration")
|
||||
return
|
||||
|
||||
try:
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
cursor = conn.cursor()
|
||||
|
||||
logger.info("Creating sync_operations table...")
|
||||
|
||||
# Create the sync_operations table
|
||||
cursor.execute("""
|
||||
CREATE TABLE sync_operations (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
started_at DATETIME NOT NULL,
|
||||
completed_at DATETIME,
|
||||
success BOOLEAN,
|
||||
accounts_processed INTEGER DEFAULT 0,
|
||||
transactions_added INTEGER DEFAULT 0,
|
||||
transactions_updated INTEGER DEFAULT 0,
|
||||
balances_updated INTEGER DEFAULT 0,
|
||||
duration_seconds REAL,
|
||||
errors TEXT,
|
||||
logs TEXT,
|
||||
trigger_type TEXT DEFAULT 'manual'
|
||||
)
|
||||
""")
|
||||
|
||||
# Create indexes for better performance
|
||||
cursor.execute(
|
||||
"CREATE INDEX IF NOT EXISTS idx_sync_operations_started_at ON sync_operations(started_at)"
|
||||
)
|
||||
cursor.execute(
|
||||
"CREATE INDEX IF NOT EXISTS idx_sync_operations_success ON sync_operations(success)"
|
||||
)
|
||||
cursor.execute(
|
||||
"CREATE INDEX IF NOT EXISTS idx_sync_operations_trigger_type ON sync_operations(trigger_type)"
|
||||
)
|
||||
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
logger.info("Sync operations table migration completed successfully")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Sync operations table migration failed: {e}")
|
||||
raise
|
||||
|
||||
async def _migrate_add_sync_operations_if_needed(self):
|
||||
"""Check and add sync_operations table if needed"""
|
||||
try:
|
||||
if await self._check_sync_operations_migration_needed():
|
||||
logger.info("Sync operations table migration needed, starting...")
|
||||
await self._migrate_add_sync_operations()
|
||||
logger.info("Sync operations table migration completed")
|
||||
else:
|
||||
logger.info("Sync operations table already exists")
|
||||
except Exception as e:
|
||||
logger.error(f"Sync operations table migration failed: {e}")
|
||||
raise
|
||||
|
||||
async def persist_sync_operation(self, sync_operation: Dict[str, Any]) -> int:
|
||||
"""Persist sync operation to database and return the ID"""
|
||||
if not self.sqlite_enabled:
|
||||
logger.warning("SQLite database disabled, cannot persist sync operation")
|
||||
return 0
|
||||
|
||||
try:
|
||||
import json
|
||||
import sqlite3
|
||||
|
||||
db_path = path_manager.get_database_path()
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Insert sync operation
|
||||
cursor.execute(
|
||||
"""INSERT INTO sync_operations (
|
||||
started_at, completed_at, success, accounts_processed,
|
||||
transactions_added, transactions_updated, balances_updated,
|
||||
duration_seconds, errors, logs, trigger_type
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
||||
(
|
||||
sync_operation.get("started_at"),
|
||||
sync_operation.get("completed_at"),
|
||||
sync_operation.get("success"),
|
||||
sync_operation.get("accounts_processed", 0),
|
||||
sync_operation.get("transactions_added", 0),
|
||||
sync_operation.get("transactions_updated", 0),
|
||||
sync_operation.get("balances_updated", 0),
|
||||
sync_operation.get("duration_seconds"),
|
||||
json.dumps(sync_operation.get("errors", [])),
|
||||
json.dumps(sync_operation.get("logs", [])),
|
||||
sync_operation.get("trigger_type", "manual"),
|
||||
),
|
||||
)
|
||||
|
||||
operation_id = cursor.lastrowid
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
logger.debug(f"Persisted sync operation with ID: {operation_id}")
|
||||
return operation_id
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to persist sync operation: {e}")
|
||||
raise
|
||||
|
||||
async def get_sync_operations(self, limit: int = 50, offset: int = 0) -> List[Dict[str, Any]]:
|
||||
"""Get sync operations from database"""
|
||||
if not self.sqlite_enabled:
|
||||
logger.warning("SQLite database disabled, cannot get sync operations")
|
||||
return []
|
||||
|
||||
try:
|
||||
import json
|
||||
import sqlite3
|
||||
|
||||
db_path = path_manager.get_database_path()
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Get sync operations ordered by started_at descending
|
||||
cursor.execute(
|
||||
"""SELECT id, started_at, completed_at, success, accounts_processed,
|
||||
transactions_added, transactions_updated, balances_updated,
|
||||
duration_seconds, errors, logs, trigger_type
|
||||
FROM sync_operations
|
||||
ORDER BY started_at DESC
|
||||
LIMIT ? OFFSET ?""",
|
||||
(limit, offset),
|
||||
)
|
||||
|
||||
operations = []
|
||||
for row in cursor.fetchall():
|
||||
operation = {
|
||||
"id": row[0],
|
||||
"started_at": row[1],
|
||||
"completed_at": row[2],
|
||||
"success": bool(row[3]) if row[3] is not None else None,
|
||||
"accounts_processed": row[4],
|
||||
"transactions_added": row[5],
|
||||
"transactions_updated": row[6],
|
||||
"balances_updated": row[7],
|
||||
"duration_seconds": row[8],
|
||||
"errors": json.loads(row[9]) if row[9] else [],
|
||||
"logs": json.loads(row[10]) if row[10] else [],
|
||||
"trigger_type": row[11],
|
||||
}
|
||||
operations.append(operation)
|
||||
|
||||
conn.close()
|
||||
return operations
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get sync operations: {e}")
|
||||
return []
|
||||
@@ -20,7 +20,7 @@ class SyncService:
|
||||
"""Get current sync status"""
|
||||
return self._sync_status
|
||||
|
||||
async def sync_all_accounts(self, force: bool = False) -> SyncResult:
|
||||
async def sync_all_accounts(self, force: bool = False, trigger_type: str = "manual") -> SyncResult:
|
||||
"""Sync all connected accounts"""
|
||||
if self._sync_status.is_running and not force:
|
||||
raise Exception("Sync is already running")
|
||||
@@ -34,9 +34,25 @@ class SyncService:
|
||||
transactions_updated = 0
|
||||
balances_updated = 0
|
||||
errors = []
|
||||
logs = [f"Sync started at {start_time.isoformat()}"]
|
||||
|
||||
# Initialize sync operation record
|
||||
sync_operation = {
|
||||
"started_at": start_time.isoformat(),
|
||||
"trigger_type": trigger_type,
|
||||
"accounts_processed": 0,
|
||||
"transactions_added": 0,
|
||||
"transactions_updated": 0,
|
||||
"balances_updated": 0,
|
||||
"errors": [],
|
||||
"logs": logs,
|
||||
}
|
||||
|
||||
operation_id = None
|
||||
|
||||
try:
|
||||
logger.info("Starting sync of all accounts")
|
||||
logs.append("Starting sync of all accounts")
|
||||
|
||||
# Get all requisitions and accounts
|
||||
requisitions = await self.gocardless.get_requisitions()
|
||||
@@ -46,6 +62,7 @@ class SyncService:
|
||||
all_accounts.update(req.get("accounts", []))
|
||||
|
||||
self._sync_status.total_accounts = len(all_accounts)
|
||||
logs.append(f"Found {len(all_accounts)} accounts to sync")
|
||||
|
||||
# Process each account
|
||||
for account_id in all_accounts:
|
||||
@@ -118,17 +135,39 @@ class SyncService:
|
||||
self._sync_status.accounts_synced = accounts_processed
|
||||
|
||||
logger.info(f"Synced account {account_id} successfully")
|
||||
logs.append(f"Synced account {account_id} successfully")
|
||||
|
||||
except Exception as e:
|
||||
error_msg = f"Failed to sync account {account_id}: {str(e)}"
|
||||
errors.append(error_msg)
|
||||
logger.error(error_msg)
|
||||
logs.append(error_msg)
|
||||
|
||||
end_time = datetime.now()
|
||||
duration = (end_time - start_time).total_seconds()
|
||||
|
||||
self._sync_status.last_sync = end_time
|
||||
|
||||
# Update sync operation with final results
|
||||
sync_operation.update({
|
||||
"completed_at": end_time.isoformat(),
|
||||
"success": len(errors) == 0,
|
||||
"accounts_processed": accounts_processed,
|
||||
"transactions_added": transactions_added,
|
||||
"transactions_updated": transactions_updated,
|
||||
"balances_updated": balances_updated,
|
||||
"duration_seconds": duration,
|
||||
"errors": errors,
|
||||
"logs": logs,
|
||||
})
|
||||
|
||||
# Persist sync operation to database
|
||||
try:
|
||||
operation_id = await self.database.persist_sync_operation(sync_operation)
|
||||
logger.debug(f"Saved sync operation with ID: {operation_id}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to persist sync operation: {e}")
|
||||
|
||||
result = SyncResult(
|
||||
success=len(errors) == 0,
|
||||
accounts_processed=accounts_processed,
|
||||
@@ -144,44 +183,57 @@ class SyncService:
|
||||
logger.info(
|
||||
f"Sync completed: {accounts_processed} accounts, {transactions_added} new transactions"
|
||||
)
|
||||
logs.append(f"Sync completed: {accounts_processed} accounts, {transactions_added} new transactions")
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
error_msg = f"Sync failed: {str(e)}"
|
||||
errors.append(error_msg)
|
||||
logs.append(error_msg)
|
||||
logger.error(error_msg)
|
||||
|
||||
# Save failed sync operation
|
||||
end_time = datetime.now()
|
||||
duration = (end_time - start_time).total_seconds()
|
||||
sync_operation.update({
|
||||
"completed_at": end_time.isoformat(),
|
||||
"success": False,
|
||||
"accounts_processed": accounts_processed,
|
||||
"transactions_added": transactions_added,
|
||||
"transactions_updated": transactions_updated,
|
||||
"balances_updated": balances_updated,
|
||||
"duration_seconds": duration,
|
||||
"errors": errors,
|
||||
"logs": logs,
|
||||
})
|
||||
|
||||
try:
|
||||
operation_id = await self.database.persist_sync_operation(sync_operation)
|
||||
logger.debug(f"Saved failed sync operation with ID: {operation_id}")
|
||||
except Exception as persist_error:
|
||||
logger.error(f"Failed to persist failed sync operation: {persist_error}")
|
||||
|
||||
raise
|
||||
finally:
|
||||
self._sync_status.is_running = False
|
||||
|
||||
async def sync_specific_accounts(
|
||||
self, account_ids: List[str], force: bool = False
|
||||
self, account_ids: List[str], force: bool = False, trigger_type: str = "manual"
|
||||
) -> SyncResult:
|
||||
"""Sync specific accounts"""
|
||||
if self._sync_status.is_running and not force:
|
||||
raise Exception("Sync is already running")
|
||||
|
||||
# Similar implementation but only for specified accounts
|
||||
# For brevity, implementing a simplified version
|
||||
start_time = datetime.now()
|
||||
self._sync_status.is_running = True
|
||||
|
||||
try:
|
||||
# Process only specified accounts
|
||||
# Implementation would be similar to sync_all_accounts
|
||||
# but filtered to only the specified account_ids
|
||||
# For now, delegate to sync_all_accounts but with specific filtering
|
||||
# This could be optimized later to only process specified accounts
|
||||
result = await self.sync_all_accounts(force=force, trigger_type=trigger_type)
|
||||
|
||||
# Filter results to only specified accounts if needed
|
||||
# For simplicity, we'll return the full result for now
|
||||
return result
|
||||
|
||||
end_time = datetime.now()
|
||||
return SyncResult(
|
||||
success=True,
|
||||
accounts_processed=len(account_ids),
|
||||
transactions_added=0,
|
||||
transactions_updated=0,
|
||||
balances_updated=0,
|
||||
duration_seconds=(end_time - start_time).total_seconds(),
|
||||
errors=[],
|
||||
started_at=start_time,
|
||||
completed_at=end_time,
|
||||
)
|
||||
finally:
|
||||
self._sync_status.is_running = False
|
||||
|
||||
Reference in New Issue
Block a user