mirror of
https://github.com/elisiariocouto/leggen.git
synced 2025-12-28 23:09:13 +00:00
651 lines
19 KiB
Python
651 lines
19 KiB
Python
import json
|
|
import sqlite3
|
|
from sqlite3 import IntegrityError
|
|
|
|
import click
|
|
|
|
from leggen.utils.text import success, warning
|
|
from leggen.utils.paths import path_manager
|
|
|
|
|
|
def persist_balances(ctx: click.Context, balance: dict):
|
|
# Connect to SQLite database
|
|
db_path = path_manager.get_database_path()
|
|
conn = sqlite3.connect(str(db_path))
|
|
cursor = conn.cursor()
|
|
|
|
# Create the accounts table if it doesn't exist
|
|
cursor.execute(
|
|
"""CREATE TABLE IF NOT EXISTS accounts (
|
|
id TEXT PRIMARY KEY,
|
|
institution_id TEXT,
|
|
status TEXT,
|
|
iban TEXT,
|
|
name TEXT,
|
|
currency TEXT,
|
|
created DATETIME,
|
|
last_accessed DATETIME,
|
|
last_updated DATETIME
|
|
)"""
|
|
)
|
|
|
|
# Create indexes for accounts table
|
|
cursor.execute(
|
|
"""CREATE INDEX IF NOT EXISTS idx_accounts_institution_id
|
|
ON accounts(institution_id)"""
|
|
)
|
|
cursor.execute(
|
|
"""CREATE INDEX IF NOT EXISTS idx_accounts_status
|
|
ON accounts(status)"""
|
|
)
|
|
|
|
# Create the balances table if it doesn't exist
|
|
cursor.execute(
|
|
"""CREATE TABLE IF NOT EXISTS balances (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
account_id TEXT,
|
|
bank TEXT,
|
|
status TEXT,
|
|
iban TEXT,
|
|
amount REAL,
|
|
currency TEXT,
|
|
type TEXT,
|
|
timestamp DATETIME
|
|
)"""
|
|
)
|
|
|
|
# Create indexes for better performance
|
|
cursor.execute(
|
|
"""CREATE INDEX IF NOT EXISTS idx_balances_account_id
|
|
ON balances(account_id)"""
|
|
)
|
|
cursor.execute(
|
|
"""CREATE INDEX IF NOT EXISTS idx_balances_timestamp
|
|
ON balances(timestamp)"""
|
|
)
|
|
cursor.execute(
|
|
"""CREATE INDEX IF NOT EXISTS idx_balances_account_type_timestamp
|
|
ON balances(account_id, type, timestamp)"""
|
|
)
|
|
|
|
# Insert balance into SQLite database
|
|
try:
|
|
cursor.execute(
|
|
"""INSERT INTO balances (
|
|
account_id,
|
|
bank,
|
|
status,
|
|
iban,
|
|
amount,
|
|
currency,
|
|
type,
|
|
timestamp
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
|
|
(
|
|
balance["account_id"],
|
|
balance["bank"],
|
|
balance["status"],
|
|
balance["iban"],
|
|
balance["amount"],
|
|
balance["currency"],
|
|
balance["type"],
|
|
balance["timestamp"],
|
|
),
|
|
)
|
|
except IntegrityError:
|
|
warning(f"[{balance['account_id']}] Skipped duplicate balance")
|
|
|
|
# Commit changes and close the connection
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
success(f"[{balance['account_id']}] Inserted balance of type {balance['type']}")
|
|
|
|
return balance
|
|
|
|
|
|
def persist_transactions(ctx: click.Context, account: str, transactions: list) -> list:
|
|
# Connect to SQLite database
|
|
db_path = path_manager.get_database_path()
|
|
conn = sqlite3.connect(str(db_path))
|
|
cursor = conn.cursor()
|
|
|
|
# Create the transactions table if it doesn't exist
|
|
cursor.execute(
|
|
"""CREATE TABLE IF NOT EXISTS transactions (
|
|
accountId TEXT NOT NULL,
|
|
transactionId TEXT NOT NULL,
|
|
internalTransactionId TEXT,
|
|
institutionId TEXT,
|
|
iban TEXT,
|
|
transactionDate DATETIME,
|
|
description TEXT,
|
|
transactionValue REAL,
|
|
transactionCurrency TEXT,
|
|
transactionStatus TEXT,
|
|
rawTransaction JSON,
|
|
PRIMARY KEY (accountId, transactionId)
|
|
)"""
|
|
)
|
|
|
|
# Create indexes for better performance
|
|
cursor.execute(
|
|
"""CREATE INDEX IF NOT EXISTS idx_transactions_internal_id
|
|
ON transactions(internalTransactionId)"""
|
|
)
|
|
cursor.execute(
|
|
"""CREATE INDEX IF NOT EXISTS idx_transactions_date
|
|
ON transactions(transactionDate)"""
|
|
)
|
|
cursor.execute(
|
|
"""CREATE INDEX IF NOT EXISTS idx_transactions_account_date
|
|
ON transactions(accountId, transactionDate)"""
|
|
)
|
|
cursor.execute(
|
|
"""CREATE INDEX IF NOT EXISTS idx_transactions_amount
|
|
ON transactions(transactionValue)"""
|
|
)
|
|
|
|
# Insert transactions into SQLite database
|
|
duplicates_count = 0
|
|
|
|
# Prepare an SQL statement for inserting data
|
|
insert_sql = """INSERT OR REPLACE INTO transactions (
|
|
accountId,
|
|
transactionId,
|
|
internalTransactionId,
|
|
institutionId,
|
|
iban,
|
|
transactionDate,
|
|
description,
|
|
transactionValue,
|
|
transactionCurrency,
|
|
transactionStatus,
|
|
rawTransaction
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"""
|
|
|
|
new_transactions = []
|
|
|
|
for transaction in transactions:
|
|
try:
|
|
cursor.execute(
|
|
insert_sql,
|
|
(
|
|
transaction["accountId"],
|
|
transaction["transactionId"],
|
|
transaction.get("internalTransactionId"),
|
|
transaction["institutionId"],
|
|
transaction["iban"],
|
|
transaction["transactionDate"],
|
|
transaction["description"],
|
|
transaction["transactionValue"],
|
|
transaction["transactionCurrency"],
|
|
transaction["transactionStatus"],
|
|
json.dumps(transaction["rawTransaction"]),
|
|
),
|
|
)
|
|
new_transactions.append(transaction)
|
|
except IntegrityError:
|
|
# A transaction with the same ID already exists, indicating a duplicate
|
|
duplicates_count += 1
|
|
|
|
# Commit changes and close the connection
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
success(f"[{account}] Inserted {len(new_transactions)} new transactions")
|
|
if duplicates_count:
|
|
warning(f"[{account}] Skipped {duplicates_count} duplicate transactions")
|
|
|
|
return new_transactions
|
|
|
|
|
|
def get_transactions(
|
|
account_id=None,
|
|
limit=100,
|
|
offset=0,
|
|
date_from=None,
|
|
date_to=None,
|
|
min_amount=None,
|
|
max_amount=None,
|
|
search=None,
|
|
):
|
|
"""Get transactions from SQLite database with optional filtering"""
|
|
db_path = path_manager.get_database_path()
|
|
if not db_path.exists():
|
|
return []
|
|
conn = sqlite3.connect(str(db_path))
|
|
conn.row_factory = sqlite3.Row # Enable dict-like access
|
|
cursor = conn.cursor()
|
|
|
|
# Build query with filters
|
|
query = "SELECT * FROM transactions WHERE 1=1"
|
|
params = []
|
|
|
|
if account_id:
|
|
query += " AND accountId = ?"
|
|
params.append(account_id)
|
|
|
|
if date_from:
|
|
query += " AND transactionDate >= ?"
|
|
params.append(date_from)
|
|
|
|
if date_to:
|
|
query += " AND transactionDate <= ?"
|
|
params.append(date_to)
|
|
|
|
if min_amount is not None:
|
|
query += " AND transactionValue >= ?"
|
|
params.append(min_amount)
|
|
|
|
if max_amount is not None:
|
|
query += " AND transactionValue <= ?"
|
|
params.append(max_amount)
|
|
|
|
if search:
|
|
query += " AND description LIKE ?"
|
|
params.append(f"%{search}%")
|
|
|
|
# Add ordering and pagination
|
|
query += " ORDER BY transactionDate DESC"
|
|
|
|
if limit:
|
|
query += " LIMIT ?"
|
|
params.append(limit)
|
|
|
|
if offset:
|
|
query += " OFFSET ?"
|
|
params.append(offset)
|
|
|
|
try:
|
|
cursor.execute(query, params)
|
|
rows = cursor.fetchall()
|
|
|
|
# Convert to list of dicts and parse JSON fields
|
|
transactions = []
|
|
for row in rows:
|
|
transaction = dict(row)
|
|
if transaction["rawTransaction"]:
|
|
transaction["rawTransaction"] = json.loads(
|
|
transaction["rawTransaction"]
|
|
)
|
|
transactions.append(transaction)
|
|
|
|
conn.close()
|
|
return transactions
|
|
|
|
except Exception as e:
|
|
conn.close()
|
|
raise e
|
|
|
|
|
|
def get_balances(account_id=None):
|
|
"""Get latest balances from SQLite database"""
|
|
db_path = path_manager.get_database_path()
|
|
if not db_path.exists():
|
|
return []
|
|
conn = sqlite3.connect(str(db_path))
|
|
conn.row_factory = sqlite3.Row
|
|
cursor = conn.cursor()
|
|
|
|
# Get latest balance for each account_id and type combination
|
|
query = """
|
|
SELECT * FROM balances b1
|
|
WHERE b1.timestamp = (
|
|
SELECT MAX(b2.timestamp)
|
|
FROM balances b2
|
|
WHERE b2.account_id = b1.account_id AND b2.type = b1.type
|
|
)
|
|
"""
|
|
params = []
|
|
|
|
if account_id:
|
|
query += " AND b1.account_id = ?"
|
|
params.append(account_id)
|
|
|
|
query += " ORDER BY b1.account_id, b1.type"
|
|
|
|
try:
|
|
cursor.execute(query, params)
|
|
rows = cursor.fetchall()
|
|
|
|
balances = [dict(row) for row in rows]
|
|
conn.close()
|
|
return balances
|
|
|
|
except Exception as e:
|
|
conn.close()
|
|
raise e
|
|
|
|
|
|
def get_account_summary(account_id):
|
|
"""Get basic account info from transactions table (avoids GoCardless API call)"""
|
|
db_path = path_manager.get_database_path()
|
|
if not db_path.exists():
|
|
return None
|
|
conn = sqlite3.connect(str(db_path))
|
|
conn.row_factory = sqlite3.Row
|
|
cursor = conn.cursor()
|
|
|
|
try:
|
|
# Get account info from most recent transaction
|
|
cursor.execute(
|
|
"""
|
|
SELECT DISTINCT accountId, institutionId, iban
|
|
FROM transactions
|
|
WHERE accountId = ?
|
|
ORDER BY transactionDate DESC
|
|
LIMIT 1
|
|
""",
|
|
(account_id,),
|
|
)
|
|
|
|
row = cursor.fetchone()
|
|
conn.close()
|
|
|
|
if row:
|
|
return dict(row)
|
|
return None
|
|
|
|
except Exception as e:
|
|
conn.close()
|
|
raise e
|
|
|
|
|
|
def get_transaction_count(account_id=None, **filters):
|
|
"""Get total count of transactions matching filters"""
|
|
db_path = path_manager.get_database_path()
|
|
if not db_path.exists():
|
|
return 0
|
|
conn = sqlite3.connect(str(db_path))
|
|
cursor = conn.cursor()
|
|
|
|
query = "SELECT COUNT(*) FROM transactions WHERE 1=1"
|
|
params = []
|
|
|
|
if account_id:
|
|
query += " AND accountId = ?"
|
|
params.append(account_id)
|
|
|
|
# Add same filters as get_transactions
|
|
if filters.get("date_from"):
|
|
query += " AND transactionDate >= ?"
|
|
params.append(filters["date_from"])
|
|
|
|
if filters.get("date_to"):
|
|
query += " AND transactionDate <= ?"
|
|
params.append(filters["date_to"])
|
|
|
|
if filters.get("min_amount") is not None:
|
|
query += " AND transactionValue >= ?"
|
|
params.append(filters["min_amount"])
|
|
|
|
if filters.get("max_amount") is not None:
|
|
query += " AND transactionValue <= ?"
|
|
params.append(filters["max_amount"])
|
|
|
|
if filters.get("search"):
|
|
query += " AND description LIKE ?"
|
|
params.append(f"%{filters['search']}%")
|
|
|
|
try:
|
|
cursor.execute(query, params)
|
|
count = cursor.fetchone()[0]
|
|
conn.close()
|
|
return count
|
|
|
|
except Exception as e:
|
|
conn.close()
|
|
raise e
|
|
|
|
|
|
def persist_account(account_data: dict):
|
|
"""Persist account details to SQLite database"""
|
|
db_path = path_manager.get_database_path()
|
|
conn = sqlite3.connect(str(db_path))
|
|
cursor = conn.cursor()
|
|
|
|
# Create the accounts table if it doesn't exist
|
|
cursor.execute(
|
|
"""CREATE TABLE IF NOT EXISTS accounts (
|
|
id TEXT PRIMARY KEY,
|
|
institution_id TEXT,
|
|
status TEXT,
|
|
iban TEXT,
|
|
name TEXT,
|
|
currency TEXT,
|
|
created DATETIME,
|
|
last_accessed DATETIME,
|
|
last_updated DATETIME
|
|
)"""
|
|
)
|
|
|
|
# Create indexes for accounts table
|
|
cursor.execute(
|
|
"""CREATE INDEX IF NOT EXISTS idx_accounts_institution_id
|
|
ON accounts(institution_id)"""
|
|
)
|
|
cursor.execute(
|
|
"""CREATE INDEX IF NOT EXISTS idx_accounts_status
|
|
ON accounts(status)"""
|
|
)
|
|
|
|
try:
|
|
# Insert or replace account data
|
|
cursor.execute(
|
|
"""INSERT OR REPLACE INTO accounts (
|
|
id,
|
|
institution_id,
|
|
status,
|
|
iban,
|
|
name,
|
|
currency,
|
|
created,
|
|
last_accessed,
|
|
last_updated
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
|
(
|
|
account_data["id"],
|
|
account_data["institution_id"],
|
|
account_data["status"],
|
|
account_data.get("iban"),
|
|
account_data.get("name"),
|
|
account_data.get("currency"),
|
|
account_data["created"],
|
|
account_data.get("last_accessed"),
|
|
account_data.get("last_updated", account_data["created"]),
|
|
),
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
success(f"[{account_data['id']}] Account details persisted to database")
|
|
return account_data
|
|
|
|
except Exception as e:
|
|
conn.close()
|
|
raise e
|
|
|
|
|
|
def get_accounts(account_ids=None):
|
|
"""Get account details from SQLite database"""
|
|
db_path = path_manager.get_database_path()
|
|
if not db_path.exists():
|
|
return []
|
|
conn = sqlite3.connect(str(db_path))
|
|
conn.row_factory = sqlite3.Row
|
|
cursor = conn.cursor()
|
|
|
|
query = "SELECT * FROM accounts"
|
|
params = []
|
|
|
|
if account_ids:
|
|
placeholders = ",".join("?" * len(account_ids))
|
|
query += f" WHERE id IN ({placeholders})"
|
|
params.extend(account_ids)
|
|
|
|
query += " ORDER BY created DESC"
|
|
|
|
try:
|
|
cursor.execute(query, params)
|
|
rows = cursor.fetchall()
|
|
|
|
accounts = [dict(row) for row in rows]
|
|
conn.close()
|
|
return accounts
|
|
|
|
except Exception as e:
|
|
conn.close()
|
|
raise e
|
|
|
|
|
|
def get_account(account_id: str):
|
|
"""Get specific account details from SQLite database"""
|
|
db_path = path_manager.get_database_path()
|
|
if not db_path.exists():
|
|
return None
|
|
conn = sqlite3.connect(str(db_path))
|
|
conn.row_factory = sqlite3.Row
|
|
cursor = conn.cursor()
|
|
|
|
try:
|
|
cursor.execute("SELECT * FROM accounts WHERE id = ?", (account_id,))
|
|
row = cursor.fetchone()
|
|
conn.close()
|
|
|
|
if row:
|
|
return dict(row)
|
|
return None
|
|
|
|
except Exception as e:
|
|
conn.close()
|
|
raise e
|
|
|
|
|
|
def get_historical_balances(account_id=None, days=365):
|
|
"""Get historical balance progression based on transaction history"""
|
|
db_path = path_manager.get_database_path()
|
|
if not db_path.exists():
|
|
return []
|
|
|
|
conn = sqlite3.connect(str(db_path))
|
|
conn.row_factory = sqlite3.Row
|
|
cursor = conn.cursor()
|
|
|
|
try:
|
|
# Get current balance for each account/type to use as the final balance
|
|
current_balances_query = """
|
|
SELECT account_id, type, amount, currency
|
|
FROM balances b1
|
|
WHERE b1.timestamp = (
|
|
SELECT MAX(b2.timestamp)
|
|
FROM balances b2
|
|
WHERE b2.account_id = b1.account_id AND b2.type = b1.type
|
|
)
|
|
"""
|
|
params = []
|
|
|
|
if account_id:
|
|
current_balances_query += " AND b1.account_id = ?"
|
|
params.append(account_id)
|
|
|
|
cursor.execute(current_balances_query, params)
|
|
current_balances = {
|
|
(row['account_id'], row['type']): {
|
|
'amount': row['amount'],
|
|
'currency': row['currency']
|
|
}
|
|
for row in cursor.fetchall()
|
|
}
|
|
|
|
# Get transactions for the specified period, ordered by date descending
|
|
from datetime import datetime, timedelta
|
|
cutoff_date = (datetime.now() - timedelta(days=days)).isoformat()
|
|
|
|
transactions_query = """
|
|
SELECT accountId, transactionDate, transactionValue
|
|
FROM transactions
|
|
WHERE transactionDate >= ?
|
|
"""
|
|
|
|
if account_id:
|
|
transactions_query += " AND accountId = ?"
|
|
params = [cutoff_date, account_id]
|
|
else:
|
|
params = [cutoff_date]
|
|
|
|
transactions_query += " ORDER BY transactionDate DESC"
|
|
|
|
cursor.execute(transactions_query, params)
|
|
transactions = cursor.fetchall()
|
|
|
|
# Calculate historical balances by working backwards from current balance
|
|
historical_balances = []
|
|
account_running_balances = {}
|
|
|
|
# Initialize running balances with current balances
|
|
for (acc_id, balance_type), balance_info in current_balances.items():
|
|
if acc_id not in account_running_balances:
|
|
account_running_balances[acc_id] = {}
|
|
account_running_balances[acc_id][balance_type] = balance_info['amount']
|
|
|
|
# Group transactions by date
|
|
from collections import defaultdict
|
|
transactions_by_date = defaultdict(list)
|
|
|
|
for txn in transactions:
|
|
date_str = txn['transactionDate'][:10] # Extract just the date part
|
|
transactions_by_date[date_str].append(txn)
|
|
|
|
# Generate historical balance points
|
|
# Start from today and work backwards
|
|
current_date = datetime.now().date()
|
|
|
|
for day_offset in range(0, days, 7): # Sample every 7 days for performance
|
|
target_date = current_date - timedelta(days=day_offset)
|
|
target_date_str = target_date.isoformat()
|
|
|
|
# For each account, create balance entries
|
|
for acc_id in account_running_balances:
|
|
for balance_type in ['closingBooked']: # Focus on closingBooked for the chart
|
|
if balance_type in account_running_balances[acc_id]:
|
|
balance_amount = account_running_balances[acc_id][balance_type]
|
|
currency = current_balances.get((acc_id, balance_type), {}).get('currency', 'EUR')
|
|
|
|
historical_balances.append({
|
|
'id': f"{acc_id}_{balance_type}_{target_date_str}",
|
|
'account_id': acc_id,
|
|
'balance_amount': balance_amount,
|
|
'balance_type': balance_type,
|
|
'currency': currency,
|
|
'reference_date': target_date_str,
|
|
'created_at': None,
|
|
'updated_at': None
|
|
})
|
|
|
|
# Subtract transactions that occurred on this date and later dates
|
|
# to simulate going back in time
|
|
for date_str in list(transactions_by_date.keys()):
|
|
if date_str >= target_date_str:
|
|
for txn in transactions_by_date[date_str]:
|
|
acc_id = txn['accountId']
|
|
amount = txn['transactionValue']
|
|
|
|
if acc_id in account_running_balances:
|
|
for balance_type in account_running_balances[acc_id]:
|
|
account_running_balances[acc_id][balance_type] -= amount
|
|
|
|
# Remove processed transactions to avoid double-processing
|
|
del transactions_by_date[date_str]
|
|
|
|
conn.close()
|
|
|
|
# Sort by date for proper chronological order
|
|
historical_balances.sort(key=lambda x: x['reference_date'])
|
|
|
|
return historical_balances
|
|
|
|
except Exception as e:
|
|
conn.close()
|
|
raise e
|