mirror of
https://github.com/elisiariocouto/leggen.git
synced 2026-01-31 05:28:24 +00:00
305 lines
12 KiB
Python
305 lines
12 KiB
Python
from datetime import datetime
|
|
from typing import List
|
|
|
|
from loguru import logger
|
|
|
|
from leggen.api.models.sync import SyncResult, SyncStatus
|
|
from leggen.repositories import (
|
|
AccountRepository,
|
|
BalanceRepository,
|
|
SyncRepository,
|
|
TransactionRepository,
|
|
)
|
|
from leggen.services.data_processors import (
|
|
AccountEnricher,
|
|
BalanceTransformer,
|
|
TransactionProcessor,
|
|
)
|
|
from leggen.services.gocardless_service import GoCardlessService
|
|
from leggen.services.notification_service import NotificationService
|
|
|
|
# Constants for notification
|
|
EXPIRED_DAYS_LEFT = 0
|
|
|
|
|
|
class SyncService:
|
|
def __init__(self):
|
|
self.gocardless = GoCardlessService()
|
|
self.notifications = NotificationService()
|
|
|
|
# Repositories
|
|
self.accounts = AccountRepository()
|
|
self.balances = BalanceRepository()
|
|
self.transactions = TransactionRepository()
|
|
self.sync = SyncRepository()
|
|
|
|
# Data processors
|
|
self.account_enricher = AccountEnricher()
|
|
self.balance_transformer = BalanceTransformer()
|
|
self.transaction_processor = TransactionProcessor()
|
|
|
|
self._sync_status = SyncStatus(is_running=False)
|
|
|
|
async def get_sync_status(self) -> SyncStatus:
|
|
"""Get current sync status"""
|
|
return self._sync_status
|
|
|
|
async def sync_all_accounts(
|
|
self, force: bool = False, trigger_type: str = "manual"
|
|
) -> SyncResult:
|
|
"""Sync all connected accounts"""
|
|
if self._sync_status.is_running and not force:
|
|
raise Exception("Sync is already running")
|
|
|
|
start_time = datetime.now()
|
|
self._sync_status.is_running = True
|
|
self._sync_status.errors = []
|
|
|
|
accounts_processed = 0
|
|
transactions_added = 0
|
|
transactions_updated = 0
|
|
balances_updated = 0
|
|
errors = []
|
|
logs = [f"Sync started at {start_time.isoformat()}"]
|
|
|
|
# Initialize sync operation record
|
|
sync_operation = {
|
|
"started_at": start_time.isoformat(),
|
|
"trigger_type": trigger_type,
|
|
"accounts_processed": 0,
|
|
"transactions_added": 0,
|
|
"transactions_updated": 0,
|
|
"balances_updated": 0,
|
|
"errors": [],
|
|
"logs": logs,
|
|
}
|
|
|
|
operation_id = None
|
|
|
|
try:
|
|
logger.info("Starting sync of all accounts")
|
|
logs.append("Starting sync of all accounts")
|
|
|
|
# Get all requisitions and accounts
|
|
requisitions = await self.gocardless.get_requisitions()
|
|
all_accounts = set()
|
|
|
|
for req in requisitions.get("results", []):
|
|
all_accounts.update(req.get("accounts", []))
|
|
|
|
self._sync_status.total_accounts = len(all_accounts)
|
|
logs.append(f"Found {len(all_accounts)} accounts to sync")
|
|
|
|
# Check for expired or expiring requisitions
|
|
await self._check_requisition_expiry(requisitions.get("results", []))
|
|
|
|
# Process each account
|
|
for account_id in all_accounts:
|
|
try:
|
|
# Get account details
|
|
account_details = await self.gocardless.get_account_details(
|
|
account_id
|
|
)
|
|
|
|
# Get balances to extract currency information
|
|
balances = await self.gocardless.get_account_balances(account_id)
|
|
|
|
# Enrich and persist account details
|
|
if account_details and balances:
|
|
# Enrich account with currency and institution logo
|
|
enriched_account_details = (
|
|
await self.account_enricher.enrich_account_details(
|
|
account_details, balances
|
|
)
|
|
)
|
|
|
|
# Persist enriched account details to database
|
|
self.accounts.persist(enriched_account_details)
|
|
|
|
# Merge account metadata into balances for persistence
|
|
balances_with_account_info = self.balance_transformer.merge_account_metadata_into_balances(
|
|
balances, enriched_account_details
|
|
)
|
|
balance_rows = (
|
|
self.balance_transformer.transform_to_database_format(
|
|
account_id, balances_with_account_info
|
|
)
|
|
)
|
|
self.balances.persist(account_id, balance_rows)
|
|
balances_updated += len(balances.get("balances", []))
|
|
elif account_details:
|
|
# Fallback: persist account details without currency if balances failed
|
|
self.accounts.persist(account_details)
|
|
|
|
# Get and save transactions
|
|
transactions = await self.gocardless.get_account_transactions(
|
|
account_id
|
|
)
|
|
if transactions:
|
|
processed_transactions = (
|
|
self.transaction_processor.process_transactions(
|
|
account_id, account_details, transactions
|
|
)
|
|
)
|
|
new_transactions = self.transactions.persist(
|
|
account_id, processed_transactions
|
|
)
|
|
transactions_added += len(new_transactions)
|
|
|
|
# Send notifications for new transactions
|
|
if new_transactions:
|
|
await self.notifications.send_transaction_notifications(
|
|
new_transactions
|
|
)
|
|
|
|
accounts_processed += 1
|
|
self._sync_status.accounts_synced = accounts_processed
|
|
|
|
logger.info(f"Synced account {account_id} successfully")
|
|
logs.append(f"Synced account {account_id} successfully")
|
|
|
|
except Exception as e:
|
|
error_msg = f"Failed to sync account {account_id}: {str(e)}"
|
|
errors.append(error_msg)
|
|
logger.error(error_msg)
|
|
logs.append(error_msg)
|
|
|
|
# Send notification for account sync failure
|
|
await self.notifications.send_sync_failure_notification(
|
|
{
|
|
"account_id": account_id,
|
|
"error": error_msg,
|
|
"type": "account_sync_failure",
|
|
}
|
|
)
|
|
|
|
end_time = datetime.now()
|
|
duration = (end_time - start_time).total_seconds()
|
|
|
|
self._sync_status.last_sync = end_time
|
|
|
|
# Update sync operation with final results
|
|
sync_operation.update(
|
|
{
|
|
"completed_at": end_time.isoformat(),
|
|
"success": len(errors) == 0,
|
|
"accounts_processed": accounts_processed,
|
|
"transactions_added": transactions_added,
|
|
"transactions_updated": transactions_updated,
|
|
"balances_updated": balances_updated,
|
|
"duration_seconds": duration,
|
|
"errors": errors,
|
|
"logs": logs,
|
|
}
|
|
)
|
|
|
|
# Persist sync operation to database
|
|
try:
|
|
operation_id = self.sync.persist(sync_operation)
|
|
logger.debug(f"Saved sync operation with ID: {operation_id}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to persist sync operation: {e}")
|
|
|
|
result = SyncResult(
|
|
success=len(errors) == 0,
|
|
accounts_processed=accounts_processed,
|
|
transactions_added=transactions_added,
|
|
transactions_updated=transactions_updated,
|
|
balances_updated=balances_updated,
|
|
duration_seconds=duration,
|
|
errors=errors,
|
|
started_at=start_time,
|
|
completed_at=end_time,
|
|
)
|
|
|
|
logger.info(
|
|
f"Sync completed: {accounts_processed} accounts, {transactions_added} new transactions"
|
|
)
|
|
logs.append(
|
|
f"Sync completed: {accounts_processed} accounts, {transactions_added} new transactions"
|
|
)
|
|
return result
|
|
|
|
except Exception as e:
|
|
error_msg = f"Sync failed: {str(e)}"
|
|
errors.append(error_msg)
|
|
logs.append(error_msg)
|
|
logger.error(error_msg)
|
|
|
|
# Save failed sync operation
|
|
end_time = datetime.now()
|
|
duration = (end_time - start_time).total_seconds()
|
|
sync_operation.update(
|
|
{
|
|
"completed_at": end_time.isoformat(),
|
|
"success": False,
|
|
"accounts_processed": accounts_processed,
|
|
"transactions_added": transactions_added,
|
|
"transactions_updated": transactions_updated,
|
|
"balances_updated": balances_updated,
|
|
"duration_seconds": duration,
|
|
"errors": errors,
|
|
"logs": logs,
|
|
}
|
|
)
|
|
|
|
try:
|
|
operation_id = self.sync.persist(sync_operation)
|
|
logger.debug(f"Saved failed sync operation with ID: {operation_id}")
|
|
except Exception as persist_error:
|
|
logger.error(
|
|
f"Failed to persist failed sync operation: {persist_error}"
|
|
)
|
|
|
|
raise
|
|
finally:
|
|
self._sync_status.is_running = False
|
|
|
|
async def _check_requisition_expiry(self, requisitions: List[dict]) -> None:
|
|
"""Check requisitions for expiry and send notifications.
|
|
|
|
Args:
|
|
requisitions: List of requisition dictionaries to check
|
|
"""
|
|
for req in requisitions:
|
|
requisition_id = req.get("id", "unknown")
|
|
institution_id = req.get("institution_id", "unknown")
|
|
status = req.get("status", "")
|
|
|
|
# Check if requisition is expired
|
|
if status == "EX":
|
|
logger.warning(
|
|
f"Requisition {requisition_id} for {institution_id} has expired"
|
|
)
|
|
await self.notifications.send_expiry_notification(
|
|
{
|
|
"bank": institution_id,
|
|
"requisition_id": requisition_id,
|
|
"status": "expired",
|
|
"days_left": EXPIRED_DAYS_LEFT,
|
|
}
|
|
)
|
|
|
|
async def sync_specific_accounts(
|
|
self, account_ids: List[str], force: bool = False, trigger_type: str = "manual"
|
|
) -> SyncResult:
|
|
"""Sync specific accounts"""
|
|
if self._sync_status.is_running and not force:
|
|
raise Exception("Sync is already running")
|
|
|
|
self._sync_status.is_running = True
|
|
|
|
try:
|
|
# For now, delegate to sync_all_accounts but with specific filtering
|
|
# This could be optimized later to only process specified accounts
|
|
result = await self.sync_all_accounts(
|
|
force=force, trigger_type=trigger_type
|
|
)
|
|
|
|
# Filter results to only specified accounts if needed
|
|
# For simplicity, we'll return the full result for now
|
|
return result
|
|
|
|
finally:
|
|
self._sync_status.is_running = False
|