mirror of
https://github.com/elisiariocouto/leggen.git
synced 2025-12-14 14:22:32 +00:00
feat(sync): Save account balances in new table.
This commit is contained in:
@@ -1,7 +1,9 @@
|
|||||||
|
from datetime import datetime
|
||||||
|
|
||||||
import click
|
import click
|
||||||
|
|
||||||
from leggen.main import cli
|
from leggen.main import cli
|
||||||
from leggen.utils.database import save_transactions
|
from leggen.utils.database import persist_balance, save_transactions
|
||||||
from leggen.utils.network import get
|
from leggen.utils.network import get
|
||||||
from leggen.utils.notifications import send_notification
|
from leggen.utils.notifications import send_notification
|
||||||
from leggen.utils.text import error, info
|
from leggen.utils.text import error, info
|
||||||
@@ -19,6 +21,34 @@ def sync(ctx: click.Context):
|
|||||||
for r in res.get("results", []):
|
for r in res.get("results", []):
|
||||||
accounts.update(r.get("accounts", []))
|
accounts.update(r.get("accounts", []))
|
||||||
|
|
||||||
|
info(f"Syncing balances for {len(accounts)} accounts")
|
||||||
|
|
||||||
|
for account in accounts:
|
||||||
|
account_details = get(ctx, f"/accounts/{account}")
|
||||||
|
account_balances = get(ctx, f"/accounts/{account}/balances/").get(
|
||||||
|
"balances", []
|
||||||
|
)
|
||||||
|
for balance in account_balances:
|
||||||
|
balance_amount = balance["balanceAmount"]
|
||||||
|
amount = round(float(balance_amount["amount"]), 2)
|
||||||
|
balance_document = {
|
||||||
|
"account_id": account,
|
||||||
|
"bank": account_details["institution_id"],
|
||||||
|
"status": account_details["status"],
|
||||||
|
"iban": account_details.get("iban", "N/A"),
|
||||||
|
"amount": amount,
|
||||||
|
"currency": balance_amount["currency"],
|
||||||
|
"type": balance["balanceType"],
|
||||||
|
"timestamp": datetime.now().timestamp(),
|
||||||
|
}
|
||||||
|
try:
|
||||||
|
persist_balance(ctx, account, balance_document)
|
||||||
|
except Exception as e:
|
||||||
|
error(
|
||||||
|
f"[{account}] Error: Sync failed, skipping account, exception: {e}"
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
info(f"Syncing transactions for {len(accounts)} accounts")
|
info(f"Syncing transactions for {len(accounts)} accounts")
|
||||||
|
|
||||||
for account in accounts:
|
for account in accounts:
|
||||||
|
|||||||
@@ -5,6 +5,25 @@ from pymongo.errors import DuplicateKeyError
|
|||||||
from leggen.utils.text import success, warning
|
from leggen.utils.text import success, warning
|
||||||
|
|
||||||
|
|
||||||
|
def persist_balances(ctx: click.Context, balance: dict) -> None:
|
||||||
|
# Connect to MongoDB
|
||||||
|
mongo_uri = ctx.obj.get("database", {}).get("mongodb", {}).get("uri")
|
||||||
|
client = MongoClient(mongo_uri)
|
||||||
|
db = client["leggen"]
|
||||||
|
balances_collection = db["balances"]
|
||||||
|
|
||||||
|
# Insert balance into MongoDB
|
||||||
|
try:
|
||||||
|
balances_collection.insert_one(balance)
|
||||||
|
success(
|
||||||
|
f"[{balance['account_id']}] Inserted new balance if type {balance['type']}"
|
||||||
|
)
|
||||||
|
except DuplicateKeyError:
|
||||||
|
warning(f"[{balance['account_id']}] Skipped duplicate balance")
|
||||||
|
|
||||||
|
client.close()
|
||||||
|
|
||||||
|
|
||||||
def persist_transactions(ctx: click.Context, account: str, transactions: list) -> list:
|
def persist_transactions(ctx: click.Context, account: str, transactions: list) -> list:
|
||||||
# Connect to MongoDB
|
# Connect to MongoDB
|
||||||
mongo_uri = ctx.obj.get("database", {}).get("mongodb", {}).get("uri")
|
mongo_uri = ctx.obj.get("database", {}).get("mongodb", {}).get("uri")
|
||||||
|
|||||||
@@ -7,9 +7,63 @@ import click
|
|||||||
from leggen.utils.text import success, warning
|
from leggen.utils.text import success, warning
|
||||||
|
|
||||||
|
|
||||||
def persist_transactions(ctx: click.Context, account: str, transactions: list) -> list:
|
def persist_balances(ctx: click.Context, balance: dict):
|
||||||
# Path to your SQLite database file
|
# Connect to SQLite database
|
||||||
|
conn = sqlite3.connect("./leggen.db")
|
||||||
|
cursor = conn.cursor()
|
||||||
|
|
||||||
|
# Create the balances table if it doesn't exist
|
||||||
|
cursor.execute(
|
||||||
|
"""CREATE TABLE IF NOT EXISTS balances (
|
||||||
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
|
account_id TEXT,
|
||||||
|
bank TEXT,
|
||||||
|
status TEXT,
|
||||||
|
iban TEXT,
|
||||||
|
amount REAL,
|
||||||
|
currency TEXT,
|
||||||
|
type TEXT,
|
||||||
|
timestamp DATETIME
|
||||||
|
)"""
|
||||||
|
)
|
||||||
|
|
||||||
|
# Insert balance into SQLite database
|
||||||
|
try:
|
||||||
|
cursor.execute(
|
||||||
|
"""INSERT INTO balances (
|
||||||
|
account_id,
|
||||||
|
bank,
|
||||||
|
status,
|
||||||
|
iban,
|
||||||
|
amount,
|
||||||
|
currency,
|
||||||
|
type,
|
||||||
|
timestamp
|
||||||
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
|
||||||
|
(
|
||||||
|
balance["account_id"],
|
||||||
|
balance["bank"],
|
||||||
|
balance["status"],
|
||||||
|
balance["iban"],
|
||||||
|
balance["amount"],
|
||||||
|
balance["currency"],
|
||||||
|
balance["type"],
|
||||||
|
balance["timestamp"],
|
||||||
|
),
|
||||||
|
)
|
||||||
|
except IntegrityError:
|
||||||
|
warning(f"[{balance['account_id']}] Skipped duplicate balance")
|
||||||
|
|
||||||
|
# Commit changes and close the connection
|
||||||
|
conn.commit()
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
success(f"[{balance['account_id']}] Inserted balance of type {balance['type']}")
|
||||||
|
|
||||||
|
return balance
|
||||||
|
|
||||||
|
|
||||||
|
def persist_transactions(ctx: click.Context, account: str, transactions: list) -> list:
|
||||||
# Connect to SQLite database
|
# Connect to SQLite database
|
||||||
conn = sqlite3.connect("./leggen.db")
|
conn = sqlite3.connect("./leggen.db")
|
||||||
cursor = conn.cursor()
|
cursor = conn.cursor()
|
||||||
|
|||||||
@@ -8,6 +8,21 @@ from leggen.utils.network import get
|
|||||||
from leggen.utils.text import info, warning
|
from leggen.utils.text import info, warning
|
||||||
|
|
||||||
|
|
||||||
|
def persist_balance(ctx: click.Context, account: str, balance: dict) -> None:
|
||||||
|
sqlite = ctx.obj.get("database", {}).get("sqlite", False)
|
||||||
|
mongodb = ctx.obj.get("database", {}).get("mongodb", False)
|
||||||
|
|
||||||
|
if not sqlite and not mongodb:
|
||||||
|
warning("No database engine is enabled, skipping balance saving")
|
||||||
|
|
||||||
|
if sqlite:
|
||||||
|
info(f"[{account}] Fetched balances, saving to SQLite")
|
||||||
|
sqlite_engine.persist_balances(ctx, balance)
|
||||||
|
else:
|
||||||
|
info(f"[{account}] Fetched balances, saving to MongoDB")
|
||||||
|
mongodb_engine.persist_balances(ctx, balance)
|
||||||
|
|
||||||
|
|
||||||
def persist_transactions(ctx: click.Context, account: str, transactions: list) -> list:
|
def persist_transactions(ctx: click.Context, account: str, transactions: list) -> list:
|
||||||
sqlite = ctx.obj.get("database", {}).get("sqlite", False)
|
sqlite = ctx.obj.get("database", {}).get("sqlite", False)
|
||||||
mongodb = ctx.obj.get("database", {}).get("mongodb", False)
|
mongodb = ctx.obj.get("database", {}).get("mongodb", False)
|
||||||
|
|||||||
Reference in New Issue
Block a user