mirror of
https://github.com/elisiariocouto/leggen.git
synced 2025-12-13 09:02:23 +00:00
Compare commits
10 Commits
88037f328d
...
7007043521
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7007043521 | ||
|
|
fbb3eb9e64 | ||
|
|
3d5994bf30 | ||
|
|
edbc1cb39e | ||
|
|
504f78aa85 | ||
|
|
cbbc316537 | ||
|
|
18ee52bdff | ||
|
|
07edfeaf25 | ||
|
|
c8b161e7f2 | ||
|
|
2c85722fd0 |
3
.github/workflows/ci.yml
vendored
3
.github/workflows/ci.yml
vendored
@@ -6,6 +6,9 @@ on:
|
||||
pull_request:
|
||||
branches: ["main", "dev"]
|
||||
|
||||
permissions:
|
||||
contents: read
|
||||
|
||||
jobs:
|
||||
test-python:
|
||||
name: Test Python
|
||||
|
||||
13
.github/workflows/release.yml
vendored
13
.github/workflows/release.yml
vendored
@@ -5,6 +5,11 @@ on:
|
||||
tags:
|
||||
- "**"
|
||||
|
||||
permissions:
|
||||
contents: write
|
||||
packages: write
|
||||
id-token: write
|
||||
|
||||
jobs:
|
||||
build:
|
||||
runs-on: ubuntu-latest
|
||||
@@ -44,6 +49,9 @@ jobs:
|
||||
|
||||
push-docker-backend:
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
contents: read
|
||||
packages: write
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
@@ -90,6 +98,9 @@ jobs:
|
||||
|
||||
push-docker-frontend:
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
contents: read
|
||||
packages: write
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
@@ -137,6 +148,8 @@ jobs:
|
||||
create-github-release:
|
||||
name: Create GitHub Release
|
||||
runs-on: ubuntu-latest
|
||||
permissions:
|
||||
contents: write
|
||||
needs: [build, publish-to-pypi, push-docker-backend, push-docker-frontend]
|
||||
steps:
|
||||
- name: Checkout
|
||||
|
||||
@@ -123,9 +123,13 @@ export default function TransactionsTable() {
|
||||
search: debouncedSearchTerm || undefined,
|
||||
summaryOnly: false,
|
||||
}),
|
||||
placeholderData: (previousData) => previousData,
|
||||
});
|
||||
|
||||
const transactions = transactionsResponse?.data || [];
|
||||
const transactions = useMemo(
|
||||
() => transactionsResponse?.data || [],
|
||||
[transactionsResponse],
|
||||
);
|
||||
const pagination = useMemo(
|
||||
() =>
|
||||
transactionsResponse
|
||||
@@ -141,6 +145,31 @@ export default function TransactionsTable() {
|
||||
[transactionsResponse],
|
||||
);
|
||||
|
||||
// Calculate stats from current page transactions, memoized for performance
|
||||
const stats = useMemo(() => {
|
||||
const totalIncome = transactions
|
||||
.filter((t: Transaction) => t.transaction_value > 0)
|
||||
.reduce((sum: number, t: Transaction) => sum + t.transaction_value, 0);
|
||||
|
||||
const totalExpenses = Math.abs(
|
||||
transactions
|
||||
.filter((t: Transaction) => t.transaction_value < 0)
|
||||
.reduce((sum: number, t: Transaction) => sum + t.transaction_value, 0)
|
||||
);
|
||||
|
||||
// Get currency from first transaction, fallback to EUR
|
||||
const displayCurrency = transactions.length > 0 ? transactions[0].transaction_currency : "EUR";
|
||||
|
||||
return {
|
||||
totalCount: pagination?.total || 0,
|
||||
pageCount: transactions.length,
|
||||
totalIncome,
|
||||
totalExpenses,
|
||||
netChange: totalIncome - totalExpenses,
|
||||
displayCurrency,
|
||||
};
|
||||
}, [transactions, pagination]);
|
||||
|
||||
// Check if search is currently debouncing
|
||||
const isSearchLoading = filterState.searchTerm !== debouncedSearchTerm;
|
||||
|
||||
@@ -366,6 +395,78 @@ export default function TransactionsTable() {
|
||||
isSearchLoading={isSearchLoading}
|
||||
/>
|
||||
|
||||
{/* Transaction Statistics */}
|
||||
{transactions.length > 0 && (
|
||||
<div className="grid grid-cols-1 md:grid-cols-4 gap-4">
|
||||
<Card className="p-4">
|
||||
<div className="flex items-center justify-between">
|
||||
<div>
|
||||
<p className="text-xs text-muted-foreground uppercase tracking-wider">
|
||||
Showing
|
||||
</p>
|
||||
<p className="text-2xl font-bold text-foreground mt-1">
|
||||
{stats.pageCount}
|
||||
</p>
|
||||
<p className="text-xs text-muted-foreground mt-1">
|
||||
of {stats.totalCount} total
|
||||
</p>
|
||||
</div>
|
||||
</div>
|
||||
</Card>
|
||||
|
||||
<Card className="p-4">
|
||||
<div className="flex items-center justify-between">
|
||||
<div>
|
||||
<p className="text-xs text-muted-foreground uppercase tracking-wider">
|
||||
Income
|
||||
</p>
|
||||
<BlurredValue className="text-2xl font-bold text-green-600 mt-1 block">
|
||||
+{formatCurrency(stats.totalIncome, stats.displayCurrency)}
|
||||
</BlurredValue>
|
||||
</div>
|
||||
<TrendingUp className="h-8 w-8 text-green-600 opacity-50" />
|
||||
</div>
|
||||
</Card>
|
||||
|
||||
<Card className="p-4">
|
||||
<div className="flex items-center justify-between">
|
||||
<div>
|
||||
<p className="text-xs text-muted-foreground uppercase tracking-wider">
|
||||
Expenses
|
||||
</p>
|
||||
<BlurredValue className="text-2xl font-bold text-red-600 mt-1 block">
|
||||
-{formatCurrency(stats.totalExpenses, stats.displayCurrency)}
|
||||
</BlurredValue>
|
||||
</div>
|
||||
<TrendingDown className="h-8 w-8 text-red-600 opacity-50" />
|
||||
</div>
|
||||
</Card>
|
||||
|
||||
<Card className="p-4">
|
||||
<div className="flex items-center justify-between">
|
||||
<div>
|
||||
<p className="text-xs text-muted-foreground uppercase tracking-wider">
|
||||
Net Change
|
||||
</p>
|
||||
<BlurredValue
|
||||
className={`text-2xl font-bold mt-1 block ${
|
||||
stats.netChange >= 0 ? "text-green-600" : "text-red-600"
|
||||
}`}
|
||||
>
|
||||
{stats.netChange >= 0 ? "+" : ""}
|
||||
{formatCurrency(stats.netChange, stats.displayCurrency)}
|
||||
</BlurredValue>
|
||||
</div>
|
||||
{stats.netChange >= 0 ? (
|
||||
<TrendingUp className="h-8 w-8 text-green-600 opacity-50" />
|
||||
) : (
|
||||
<TrendingDown className="h-8 w-8 text-red-600 opacity-50" />
|
||||
)}
|
||||
</div>
|
||||
</Card>
|
||||
</div>
|
||||
)}
|
||||
|
||||
{/* Responsive Table/Cards */}
|
||||
<Card>
|
||||
{/* Desktop Table View (hidden on mobile) */}
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import { useRef, useEffect } from "react";
|
||||
import { Search } from "lucide-react";
|
||||
import { Input } from "@/components/ui/input";
|
||||
import { cn } from "@/lib/utils";
|
||||
@@ -30,6 +31,21 @@ export function FilterBar({
|
||||
isSearchLoading = false,
|
||||
className,
|
||||
}: FilterBarProps) {
|
||||
const searchInputRef = useRef<HTMLInputElement>(null);
|
||||
const cursorPositionRef = useRef<number | null>(null);
|
||||
|
||||
// Maintain focus and cursor position on search input during re-renders
|
||||
useEffect(() => {
|
||||
const currentInput = searchInputRef.current;
|
||||
if (!currentInput) return;
|
||||
|
||||
// Restore focus and cursor position after data fetches complete
|
||||
if (cursorPositionRef.current !== null && document.activeElement !== currentInput) {
|
||||
currentInput.focus();
|
||||
currentInput.setSelectionRange(cursorPositionRef.current, cursorPositionRef.current);
|
||||
}
|
||||
}, [isSearchLoading]);
|
||||
|
||||
const hasActiveFilters =
|
||||
filterState.searchTerm ||
|
||||
filterState.selectedAccount ||
|
||||
@@ -61,9 +77,19 @@ export function FilterBar({
|
||||
<div className="relative w-[200px]">
|
||||
<Search className="absolute left-3 top-1/2 transform -translate-y-1/2 h-4 w-4 text-muted-foreground" />
|
||||
<Input
|
||||
ref={searchInputRef}
|
||||
placeholder="Search transactions..."
|
||||
value={filterState.searchTerm}
|
||||
onChange={(e) => onFilterChange("searchTerm", e.target.value)}
|
||||
onChange={(e) => {
|
||||
cursorPositionRef.current = e.target.selectionStart;
|
||||
onFilterChange("searchTerm", e.target.value);
|
||||
}}
|
||||
onFocus={() => {
|
||||
cursorPositionRef.current = searchInputRef.current?.selectionStart ?? null;
|
||||
}}
|
||||
onBlur={() => {
|
||||
cursorPositionRef.current = null;
|
||||
}}
|
||||
className="pl-9 pr-8 bg-background"
|
||||
/>
|
||||
{isSearchLoading && (
|
||||
@@ -99,9 +125,19 @@ export function FilterBar({
|
||||
<div className="relative">
|
||||
<Search className="absolute left-3 top-1/2 transform -translate-y-1/2 h-4 w-4 text-muted-foreground" />
|
||||
<Input
|
||||
ref={searchInputRef}
|
||||
placeholder="Search..."
|
||||
value={filterState.searchTerm}
|
||||
onChange={(e) => onFilterChange("searchTerm", e.target.value)}
|
||||
onChange={(e) => {
|
||||
cursorPositionRef.current = e.target.selectionStart;
|
||||
onFilterChange("searchTerm", e.target.value);
|
||||
}}
|
||||
onFocus={() => {
|
||||
cursorPositionRef.current = searchInputRef.current?.selectionStart ?? null;
|
||||
}}
|
||||
onBlur={() => {
|
||||
cursorPositionRef.current = null;
|
||||
}}
|
||||
className="pl-9 pr-8 bg-background w-full"
|
||||
/>
|
||||
{isSearchLoading && (
|
||||
|
||||
13
leggen/services/data_processors/__init__.py
Normal file
13
leggen/services/data_processors/__init__.py
Normal file
@@ -0,0 +1,13 @@
|
||||
"""Data processing layer for all transformation logic."""
|
||||
|
||||
from leggen.services.data_processors.account_enricher import AccountEnricher
|
||||
from leggen.services.data_processors.analytics_processor import AnalyticsProcessor
|
||||
from leggen.services.data_processors.balance_transformer import BalanceTransformer
|
||||
from leggen.services.data_processors.transaction_processor import TransactionProcessor
|
||||
|
||||
__all__ = [
|
||||
"AccountEnricher",
|
||||
"AnalyticsProcessor",
|
||||
"BalanceTransformer",
|
||||
"TransactionProcessor",
|
||||
]
|
||||
71
leggen/services/data_processors/account_enricher.py
Normal file
71
leggen/services/data_processors/account_enricher.py
Normal file
@@ -0,0 +1,71 @@
|
||||
"""Account enrichment processor for adding currency, logos, and metadata."""
|
||||
|
||||
from typing import Any, Dict
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from leggen.services.gocardless_service import GoCardlessService
|
||||
|
||||
|
||||
class AccountEnricher:
|
||||
"""Enriches account details with currency and institution information."""
|
||||
|
||||
def __init__(self):
|
||||
self.gocardless = GoCardlessService()
|
||||
|
||||
async def enrich_account_details(
|
||||
self,
|
||||
account_details: Dict[str, Any],
|
||||
balances: Dict[str, Any],
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Enrich account details with currency from balances and institution logo.
|
||||
|
||||
Args:
|
||||
account_details: Raw account details from GoCardless
|
||||
balances: Balance data containing currency information
|
||||
|
||||
Returns:
|
||||
Enriched account details with currency and logo added
|
||||
"""
|
||||
enriched_account = account_details.copy()
|
||||
|
||||
# Extract currency from first balance
|
||||
currency = self._extract_currency_from_balances(balances)
|
||||
if currency:
|
||||
enriched_account["currency"] = currency
|
||||
|
||||
# Fetch and add institution logo
|
||||
institution_id = enriched_account.get("institution_id")
|
||||
if institution_id:
|
||||
logo = await self._fetch_institution_logo(institution_id)
|
||||
if logo:
|
||||
enriched_account["logo"] = logo
|
||||
|
||||
return enriched_account
|
||||
|
||||
def _extract_currency_from_balances(self, balances: Dict[str, Any]) -> str | None:
|
||||
"""Extract currency from the first balance in the balances data."""
|
||||
balances_list = balances.get("balances", [])
|
||||
if not balances_list:
|
||||
return None
|
||||
|
||||
first_balance = balances_list[0]
|
||||
balance_amount = first_balance.get("balanceAmount", {})
|
||||
return balance_amount.get("currency")
|
||||
|
||||
async def _fetch_institution_logo(self, institution_id: str) -> str | None:
|
||||
"""Fetch institution logo from GoCardless API."""
|
||||
try:
|
||||
institution_details = await self.gocardless.get_institution_details(
|
||||
institution_id
|
||||
)
|
||||
logo = institution_details.get("logo", "")
|
||||
if logo:
|
||||
logger.info(f"Fetched logo for institution {institution_id}: {logo}")
|
||||
return logo
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"Failed to fetch institution details for {institution_id}: {e}"
|
||||
)
|
||||
return None
|
||||
201
leggen/services/data_processors/analytics_processor.py
Normal file
201
leggen/services/data_processors/analytics_processor.py
Normal file
@@ -0,0 +1,201 @@
|
||||
"""Analytics processor for calculating historical balances and statistics."""
|
||||
|
||||
import sqlite3
|
||||
from datetime import datetime, timedelta
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from loguru import logger
|
||||
|
||||
|
||||
class AnalyticsProcessor:
|
||||
"""Calculates historical balances and transaction statistics from database data."""
|
||||
|
||||
def calculate_historical_balances(
|
||||
self,
|
||||
db_path: Path,
|
||||
account_id: Optional[str] = None,
|
||||
days: int = 365,
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Generate historical balance progression based on transaction history.
|
||||
|
||||
Uses current balances and subtracts future transactions to calculate
|
||||
balance at each historical point in time.
|
||||
|
||||
Args:
|
||||
db_path: Path to SQLite database
|
||||
account_id: Optional account ID to filter by
|
||||
days: Number of days to look back (default 365)
|
||||
|
||||
Returns:
|
||||
List of historical balance data points
|
||||
"""
|
||||
if not db_path.exists():
|
||||
return []
|
||||
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
conn.row_factory = sqlite3.Row
|
||||
cursor = conn.cursor()
|
||||
|
||||
try:
|
||||
cutoff_date = (datetime.now() - timedelta(days=days)).date().isoformat()
|
||||
today_date = datetime.now().date().isoformat()
|
||||
|
||||
# Single SQL query to generate historical balances using window functions
|
||||
query = """
|
||||
WITH RECURSIVE date_series AS (
|
||||
-- Generate weekly dates from cutoff_date to today
|
||||
SELECT date(?) as ref_date
|
||||
UNION ALL
|
||||
SELECT date(ref_date, '+7 days')
|
||||
FROM date_series
|
||||
WHERE ref_date < date(?)
|
||||
),
|
||||
current_balances AS (
|
||||
-- Get current balance for each account/type
|
||||
SELECT account_id, type, amount, currency
|
||||
FROM balances b1
|
||||
WHERE b1.timestamp = (
|
||||
SELECT MAX(b2.timestamp)
|
||||
FROM balances b2
|
||||
WHERE b2.account_id = b1.account_id AND b2.type = b1.type
|
||||
)
|
||||
{account_filter}
|
||||
AND b1.type = 'closingBooked' -- Focus on closingBooked for charts
|
||||
),
|
||||
historical_points AS (
|
||||
-- Calculate balance at each weekly point by subtracting future transactions
|
||||
SELECT
|
||||
cb.account_id,
|
||||
cb.type as balance_type,
|
||||
cb.currency,
|
||||
ds.ref_date,
|
||||
cb.amount - COALESCE(
|
||||
(SELECT SUM(t.transactionValue)
|
||||
FROM transactions t
|
||||
WHERE t.accountId = cb.account_id
|
||||
AND date(t.transactionDate) > ds.ref_date), 0
|
||||
) as balance_amount
|
||||
FROM current_balances cb
|
||||
CROSS JOIN date_series ds
|
||||
)
|
||||
SELECT
|
||||
account_id || '_' || balance_type || '_' || ref_date as id,
|
||||
account_id,
|
||||
balance_amount,
|
||||
balance_type,
|
||||
currency,
|
||||
ref_date as reference_date
|
||||
FROM historical_points
|
||||
ORDER BY account_id, ref_date
|
||||
"""
|
||||
|
||||
# Build parameters and account filter
|
||||
params = [cutoff_date, today_date]
|
||||
if account_id:
|
||||
account_filter = "AND b1.account_id = ?"
|
||||
params.append(account_id)
|
||||
else:
|
||||
account_filter = ""
|
||||
|
||||
# Format the query with conditional filter
|
||||
formatted_query = query.format(account_filter=account_filter)
|
||||
|
||||
cursor.execute(formatted_query, params)
|
||||
rows = cursor.fetchall()
|
||||
|
||||
conn.close()
|
||||
return [dict(row) for row in rows]
|
||||
|
||||
except Exception as e:
|
||||
conn.close()
|
||||
logger.error(f"Failed to calculate historical balances: {e}")
|
||||
raise
|
||||
|
||||
def calculate_monthly_stats(
|
||||
self,
|
||||
db_path: Path,
|
||||
account_id: Optional[str] = None,
|
||||
date_from: Optional[str] = None,
|
||||
date_to: Optional[str] = None,
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Calculate monthly transaction statistics aggregated from database.
|
||||
|
||||
Sums transactions by month and calculates income, expenses, and net values.
|
||||
|
||||
Args:
|
||||
db_path: Path to SQLite database
|
||||
account_id: Optional account ID to filter by
|
||||
date_from: Optional start date (ISO format)
|
||||
date_to: Optional end date (ISO format)
|
||||
|
||||
Returns:
|
||||
List of monthly statistics with income, expenses, and net totals
|
||||
"""
|
||||
if not db_path.exists():
|
||||
return []
|
||||
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
conn.row_factory = sqlite3.Row
|
||||
cursor = conn.cursor()
|
||||
|
||||
try:
|
||||
# SQL query to aggregate transactions by month
|
||||
query = """
|
||||
SELECT
|
||||
strftime('%Y-%m', transactionDate) as month,
|
||||
COALESCE(SUM(CASE WHEN transactionValue > 0 THEN transactionValue ELSE 0 END), 0) as income,
|
||||
COALESCE(SUM(CASE WHEN transactionValue < 0 THEN ABS(transactionValue) ELSE 0 END), 0) as expenses,
|
||||
COALESCE(SUM(transactionValue), 0) as net
|
||||
FROM transactions
|
||||
WHERE 1=1
|
||||
"""
|
||||
|
||||
params = []
|
||||
|
||||
if account_id:
|
||||
query += " AND accountId = ?"
|
||||
params.append(account_id)
|
||||
|
||||
if date_from:
|
||||
query += " AND transactionDate >= ?"
|
||||
params.append(date_from)
|
||||
|
||||
if date_to:
|
||||
query += " AND transactionDate <= ?"
|
||||
params.append(date_to)
|
||||
|
||||
query += """
|
||||
GROUP BY strftime('%Y-%m', transactionDate)
|
||||
ORDER BY month ASC
|
||||
"""
|
||||
|
||||
cursor.execute(query, params)
|
||||
rows = cursor.fetchall()
|
||||
|
||||
# Convert to desired format with proper month display
|
||||
monthly_stats = []
|
||||
for row in rows:
|
||||
# Convert YYYY-MM to display format like "Mar 2024"
|
||||
year, month_num = row["month"].split("-")
|
||||
month_date = datetime.strptime(f"{year}-{month_num}-01", "%Y-%m-%d")
|
||||
display_month = month_date.strftime("%b %Y")
|
||||
|
||||
monthly_stats.append(
|
||||
{
|
||||
"month": display_month,
|
||||
"income": round(row["income"], 2),
|
||||
"expenses": round(row["expenses"], 2),
|
||||
"net": round(row["net"], 2),
|
||||
}
|
||||
)
|
||||
|
||||
conn.close()
|
||||
return monthly_stats
|
||||
|
||||
except Exception as e:
|
||||
conn.close()
|
||||
logger.error(f"Failed to calculate monthly stats: {e}")
|
||||
raise
|
||||
69
leggen/services/data_processors/balance_transformer.py
Normal file
69
leggen/services/data_processors/balance_transformer.py
Normal file
@@ -0,0 +1,69 @@
|
||||
"""Balance data transformation processor for format conversions."""
|
||||
|
||||
from datetime import datetime
|
||||
from typing import Any, Dict, List, Tuple
|
||||
|
||||
|
||||
class BalanceTransformer:
|
||||
"""Transforms balance data between GoCardless and internal database formats."""
|
||||
|
||||
def merge_account_metadata_into_balances(
|
||||
self,
|
||||
balances: Dict[str, Any],
|
||||
account_details: Dict[str, Any],
|
||||
) -> Dict[str, Any]:
|
||||
"""
|
||||
Merge account metadata into balance data for proper persistence.
|
||||
|
||||
This adds institution_id, iban, and account_status to the balances
|
||||
so they can be persisted alongside the balance data.
|
||||
|
||||
Args:
|
||||
balances: Raw balance data from GoCardless
|
||||
account_details: Enriched account details containing metadata
|
||||
|
||||
Returns:
|
||||
Balance data with account metadata merged in
|
||||
"""
|
||||
balances_with_metadata = balances.copy()
|
||||
balances_with_metadata["institution_id"] = account_details.get("institution_id")
|
||||
balances_with_metadata["iban"] = account_details.get("iban")
|
||||
balances_with_metadata["account_status"] = account_details.get("status")
|
||||
return balances_with_metadata
|
||||
|
||||
def transform_to_database_format(
|
||||
self,
|
||||
account_id: str,
|
||||
balance_data: Dict[str, Any],
|
||||
) -> List[Tuple[Any, ...]]:
|
||||
"""
|
||||
Transform GoCardless balance format to database row format.
|
||||
|
||||
Converts nested GoCardless balance structure into flat tuples
|
||||
ready for SQLite insertion.
|
||||
|
||||
Args:
|
||||
account_id: The account ID
|
||||
balance_data: Balance data with merged account metadata
|
||||
|
||||
Returns:
|
||||
List of tuples in database row format (account_id, bank, status, ...)
|
||||
"""
|
||||
rows = []
|
||||
|
||||
for balance in balance_data.get("balances", []):
|
||||
balance_amount = balance.get("balanceAmount", {})
|
||||
|
||||
row = (
|
||||
account_id,
|
||||
balance_data.get("institution_id", "unknown"),
|
||||
balance_data.get("account_status"),
|
||||
balance_data.get("iban", "N/A"),
|
||||
float(balance_amount.get("amount", 0)),
|
||||
balance_amount.get("currency"),
|
||||
balance.get("balanceType"),
|
||||
datetime.now().isoformat(),
|
||||
)
|
||||
rows.append(row)
|
||||
|
||||
return rows
|
||||
@@ -1,11 +1,15 @@
|
||||
import json
|
||||
import sqlite3
|
||||
from datetime import datetime, timedelta
|
||||
from datetime import datetime
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from leggen.services.transaction_processor import TransactionProcessor
|
||||
from leggen.services.data_processors import (
|
||||
AnalyticsProcessor,
|
||||
BalanceTransformer,
|
||||
TransactionProcessor,
|
||||
)
|
||||
from leggen.utils.config import config
|
||||
from leggen.utils.paths import path_manager
|
||||
|
||||
@@ -14,7 +18,11 @@ class DatabaseService:
|
||||
def __init__(self):
|
||||
self.db_config = config.database_config
|
||||
self.sqlite_enabled = self.db_config.get("sqlite", True)
|
||||
|
||||
# Data processors
|
||||
self.transaction_processor = TransactionProcessor()
|
||||
self.balance_transformer = BalanceTransformer()
|
||||
self.analytics_processor = AnalyticsProcessor()
|
||||
|
||||
async def persist_balance(
|
||||
self, account_id: str, balance_data: Dict[str, Any]
|
||||
@@ -136,7 +144,10 @@ class DatabaseService:
|
||||
return []
|
||||
|
||||
try:
|
||||
balances = self._get_historical_balances(account_id=account_id, days=days)
|
||||
db_path = path_manager.get_database_path()
|
||||
balances = self.analytics_processor.calculate_historical_balances(
|
||||
db_path, account_id=account_id, days=days
|
||||
)
|
||||
logger.debug(
|
||||
f"Retrieved {len(balances)} historical balance points from database"
|
||||
)
|
||||
@@ -753,10 +764,12 @@ class DatabaseService:
|
||||
ON balances(account_id, type, timestamp)"""
|
||||
)
|
||||
|
||||
# Convert GoCardless balance format to our format and persist
|
||||
for balance in balance_data.get("balances", []):
|
||||
balance_amount = balance["balanceAmount"]
|
||||
# Transform and persist balances
|
||||
balance_rows = self.balance_transformer.transform_to_database_format(
|
||||
account_id, balance_data
|
||||
)
|
||||
|
||||
for row in balance_rows:
|
||||
try:
|
||||
cursor.execute(
|
||||
"""INSERT INTO balances (
|
||||
@@ -769,16 +782,7 @@ class DatabaseService:
|
||||
type,
|
||||
timestamp
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
|
||||
(
|
||||
account_id,
|
||||
balance_data.get("institution_id", "unknown"),
|
||||
balance_data.get("account_status"),
|
||||
balance_data.get("iban", "N/A"),
|
||||
float(balance_amount["amount"]),
|
||||
balance_amount["currency"],
|
||||
balance["balanceType"],
|
||||
datetime.now().isoformat(),
|
||||
),
|
||||
row,
|
||||
)
|
||||
except sqlite3.IntegrityError:
|
||||
logger.warning(f"Skipped duplicate balance for {account_id}")
|
||||
@@ -1251,90 +1255,6 @@ class DatabaseService:
|
||||
conn.close()
|
||||
raise e
|
||||
|
||||
def _get_historical_balances(self, account_id=None, days=365):
|
||||
"""Get historical balance progression based on transaction history"""
|
||||
db_path = path_manager.get_database_path()
|
||||
if not db_path.exists():
|
||||
return []
|
||||
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
conn.row_factory = sqlite3.Row
|
||||
cursor = conn.cursor()
|
||||
|
||||
try:
|
||||
cutoff_date = (datetime.now() - timedelta(days=days)).date().isoformat()
|
||||
today_date = datetime.now().date().isoformat()
|
||||
|
||||
# Single SQL query to generate historical balances using window functions
|
||||
query = """
|
||||
WITH RECURSIVE date_series AS (
|
||||
-- Generate weekly dates from cutoff_date to today
|
||||
SELECT date(?) as ref_date
|
||||
UNION ALL
|
||||
SELECT date(ref_date, '+7 days')
|
||||
FROM date_series
|
||||
WHERE ref_date < date(?)
|
||||
),
|
||||
current_balances AS (
|
||||
-- Get current balance for each account/type
|
||||
SELECT account_id, type, amount, currency
|
||||
FROM balances b1
|
||||
WHERE b1.timestamp = (
|
||||
SELECT MAX(b2.timestamp)
|
||||
FROM balances b2
|
||||
WHERE b2.account_id = b1.account_id AND b2.type = b1.type
|
||||
)
|
||||
{account_filter}
|
||||
AND b1.type = 'closingBooked' -- Focus on closingBooked for charts
|
||||
),
|
||||
historical_points AS (
|
||||
-- Calculate balance at each weekly point by subtracting future transactions
|
||||
SELECT
|
||||
cb.account_id,
|
||||
cb.type as balance_type,
|
||||
cb.currency,
|
||||
ds.ref_date,
|
||||
cb.amount - COALESCE(
|
||||
(SELECT SUM(t.transactionValue)
|
||||
FROM transactions t
|
||||
WHERE t.accountId = cb.account_id
|
||||
AND date(t.transactionDate) > ds.ref_date), 0
|
||||
) as balance_amount
|
||||
FROM current_balances cb
|
||||
CROSS JOIN date_series ds
|
||||
)
|
||||
SELECT
|
||||
account_id || '_' || balance_type || '_' || ref_date as id,
|
||||
account_id,
|
||||
balance_amount,
|
||||
balance_type,
|
||||
currency,
|
||||
ref_date as reference_date
|
||||
FROM historical_points
|
||||
ORDER BY account_id, ref_date
|
||||
"""
|
||||
|
||||
# Build parameters and account filter
|
||||
params = [cutoff_date, today_date]
|
||||
if account_id:
|
||||
account_filter = "AND b1.account_id = ?"
|
||||
params.append(account_id)
|
||||
else:
|
||||
account_filter = ""
|
||||
|
||||
# Format the query with conditional filter
|
||||
formatted_query = query.format(account_filter=account_filter)
|
||||
|
||||
cursor.execute(formatted_query, params)
|
||||
rows = cursor.fetchall()
|
||||
|
||||
conn.close()
|
||||
return [dict(row) for row in rows]
|
||||
|
||||
except Exception as e:
|
||||
conn.close()
|
||||
raise e
|
||||
|
||||
async def get_monthly_transaction_stats_from_db(
|
||||
self,
|
||||
account_id: Optional[str] = None,
|
||||
@@ -1347,10 +1267,9 @@ class DatabaseService:
|
||||
return []
|
||||
|
||||
try:
|
||||
monthly_stats = self._get_monthly_transaction_stats(
|
||||
account_id=account_id,
|
||||
date_from=date_from,
|
||||
date_to=date_to,
|
||||
db_path = path_manager.get_database_path()
|
||||
monthly_stats = self.analytics_processor.calculate_monthly_stats(
|
||||
db_path, account_id=account_id, date_from=date_from, date_to=date_to
|
||||
)
|
||||
logger.debug(
|
||||
f"Retrieved {len(monthly_stats)} monthly stat points from database"
|
||||
@@ -1360,79 +1279,6 @@ class DatabaseService:
|
||||
logger.error(f"Failed to get monthly transaction stats from database: {e}")
|
||||
return []
|
||||
|
||||
def _get_monthly_transaction_stats(
|
||||
self,
|
||||
account_id: Optional[str] = None,
|
||||
date_from: Optional[str] = None,
|
||||
date_to: Optional[str] = None,
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""Get monthly transaction statistics from SQLite database"""
|
||||
db_path = path_manager.get_database_path()
|
||||
if not db_path.exists():
|
||||
return []
|
||||
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
conn.row_factory = sqlite3.Row
|
||||
cursor = conn.cursor()
|
||||
|
||||
try:
|
||||
# SQL query to aggregate transactions by month
|
||||
query = """
|
||||
SELECT
|
||||
strftime('%Y-%m', transactionDate) as month,
|
||||
COALESCE(SUM(CASE WHEN transactionValue > 0 THEN transactionValue ELSE 0 END), 0) as income,
|
||||
COALESCE(SUM(CASE WHEN transactionValue < 0 THEN ABS(transactionValue) ELSE 0 END), 0) as expenses,
|
||||
COALESCE(SUM(transactionValue), 0) as net
|
||||
FROM transactions
|
||||
WHERE 1=1
|
||||
"""
|
||||
|
||||
params = []
|
||||
|
||||
if account_id:
|
||||
query += " AND accountId = ?"
|
||||
params.append(account_id)
|
||||
|
||||
if date_from:
|
||||
query += " AND transactionDate >= ?"
|
||||
params.append(date_from)
|
||||
|
||||
if date_to:
|
||||
query += " AND transactionDate <= ?"
|
||||
params.append(date_to)
|
||||
|
||||
query += """
|
||||
GROUP BY strftime('%Y-%m', transactionDate)
|
||||
ORDER BY month ASC
|
||||
"""
|
||||
|
||||
cursor.execute(query, params)
|
||||
rows = cursor.fetchall()
|
||||
|
||||
# Convert to desired format with proper month display
|
||||
monthly_stats = []
|
||||
for row in rows:
|
||||
# Convert YYYY-MM to display format like "Mar 2024"
|
||||
year, month_num = row["month"].split("-")
|
||||
month_date = datetime.strptime(f"{year}-{month_num}-01", "%Y-%m-%d")
|
||||
display_month = month_date.strftime("%b %Y")
|
||||
|
||||
monthly_stats.append(
|
||||
{
|
||||
"month": display_month,
|
||||
"income": round(row["income"], 2),
|
||||
"expenses": round(row["expenses"], 2),
|
||||
"net": round(row["net"], 2),
|
||||
}
|
||||
)
|
||||
|
||||
conn.close()
|
||||
return monthly_stats
|
||||
|
||||
except Exception as e:
|
||||
conn.close()
|
||||
raise e
|
||||
|
||||
async def _check_sync_operations_migration_needed(self) -> bool:
|
||||
"""Check if sync_operations table needs to be created"""
|
||||
db_path = path_manager.get_database_path()
|
||||
|
||||
@@ -4,6 +4,11 @@ from typing import List
|
||||
from loguru import logger
|
||||
|
||||
from leggen.api.models.sync import SyncResult, SyncStatus
|
||||
from leggen.services.data_processors import (
|
||||
AccountEnricher,
|
||||
BalanceTransformer,
|
||||
TransactionProcessor,
|
||||
)
|
||||
from leggen.services.database_service import DatabaseService
|
||||
from leggen.services.gocardless_service import GoCardlessService
|
||||
from leggen.services.notification_service import NotificationService
|
||||
@@ -17,8 +22,13 @@ class SyncService:
|
||||
self.gocardless = GoCardlessService()
|
||||
self.database = DatabaseService()
|
||||
self.notifications = NotificationService()
|
||||
|
||||
# Data processors
|
||||
self.account_enricher = AccountEnricher()
|
||||
self.balance_transformer = BalanceTransformer()
|
||||
self.transaction_processor = TransactionProcessor()
|
||||
|
||||
self._sync_status = SyncStatus(is_running=False)
|
||||
self._institution_logos = {} # Cache for institution logos
|
||||
|
||||
async def get_sync_status(self) -> SyncStatus:
|
||||
"""Get current sync status"""
|
||||
@@ -84,54 +94,23 @@ class SyncService:
|
||||
# Get balances to extract currency information
|
||||
balances = await self.gocardless.get_account_balances(account_id)
|
||||
|
||||
# Enrich account details with currency and institution logo
|
||||
# Enrich and persist account details
|
||||
if account_details and balances:
|
||||
enriched_account_details = account_details.copy()
|
||||
|
||||
# Extract currency from first balance
|
||||
balances_list = balances.get("balances", [])
|
||||
if balances_list:
|
||||
first_balance = balances_list[0]
|
||||
balance_amount = first_balance.get("balanceAmount", {})
|
||||
currency = balance_amount.get("currency")
|
||||
if currency:
|
||||
enriched_account_details["currency"] = currency
|
||||
|
||||
# Get institution details to fetch logo
|
||||
institution_id = enriched_account_details.get("institution_id")
|
||||
if institution_id:
|
||||
try:
|
||||
institution_details = (
|
||||
await self.gocardless.get_institution_details(
|
||||
institution_id
|
||||
)
|
||||
)
|
||||
enriched_account_details["logo"] = (
|
||||
institution_details.get("logo", "")
|
||||
)
|
||||
logger.info(
|
||||
f"Fetched logo for institution {institution_id}: {enriched_account_details.get('logo', 'No logo')}"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"Failed to fetch institution details for {institution_id}: {e}"
|
||||
)
|
||||
# Enrich account with currency and institution logo
|
||||
enriched_account_details = (
|
||||
await self.account_enricher.enrich_account_details(
|
||||
account_details, balances
|
||||
)
|
||||
)
|
||||
|
||||
# Persist enriched account details to database
|
||||
await self.database.persist_account_details(
|
||||
enriched_account_details
|
||||
)
|
||||
|
||||
# Merge account details into balances data for proper persistence
|
||||
balances_with_account_info = balances.copy()
|
||||
balances_with_account_info["institution_id"] = (
|
||||
enriched_account_details.get("institution_id")
|
||||
)
|
||||
balances_with_account_info["iban"] = (
|
||||
enriched_account_details.get("iban")
|
||||
)
|
||||
balances_with_account_info["account_status"] = (
|
||||
enriched_account_details.get("status")
|
||||
# Merge account metadata into balances for persistence
|
||||
balances_with_account_info = self.balance_transformer.merge_account_metadata_into_balances(
|
||||
balances, enriched_account_details
|
||||
)
|
||||
await self.database.persist_balance(
|
||||
account_id, balances_with_account_info
|
||||
@@ -146,8 +125,10 @@ class SyncService:
|
||||
account_id
|
||||
)
|
||||
if transactions:
|
||||
processed_transactions = self.database.process_transactions(
|
||||
account_id, account_details, transactions
|
||||
processed_transactions = (
|
||||
self.transaction_processor.process_transactions(
|
||||
account_id, account_details, transactions
|
||||
)
|
||||
)
|
||||
new_transactions = await self.database.persist_transactions(
|
||||
account_id, processed_transactions
|
||||
|
||||
Reference in New Issue
Block a user