From 332d4d51d00286ecec71703aaa39e590f506d2cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Elisi=C3=A1rio=20Couto?= Date: Fri, 7 Jun 2024 19:48:06 +0100 Subject: [PATCH] feat(sync): Save account balances in new table. --- leggen/commands/sync.py | 32 ++++++++++++++++++++- leggen/database/mongo.py | 19 +++++++++++++ leggen/database/sqlite.py | 58 +++++++++++++++++++++++++++++++++++++-- leggen/utils/database.py | 15 ++++++++++ 4 files changed, 121 insertions(+), 3 deletions(-) diff --git a/leggen/commands/sync.py b/leggen/commands/sync.py index 1f9b3c0..c02b71a 100644 --- a/leggen/commands/sync.py +++ b/leggen/commands/sync.py @@ -1,7 +1,9 @@ +from datetime import datetime + import click from leggen.main import cli -from leggen.utils.database import save_transactions +from leggen.utils.database import persist_balance, save_transactions from leggen.utils.network import get from leggen.utils.notifications import send_notification from leggen.utils.text import error, info @@ -19,6 +21,34 @@ def sync(ctx: click.Context): for r in res.get("results", []): accounts.update(r.get("accounts", [])) + info(f"Syncing balances for {len(accounts)} accounts") + + for account in accounts: + account_details = get(ctx, f"/accounts/{account}") + account_balances = get(ctx, f"/accounts/{account}/balances/").get( + "balances", [] + ) + for balance in account_balances: + balance_amount = balance["balanceAmount"] + amount = round(float(balance_amount["amount"]), 2) + balance_document = { + "account_id": account, + "bank": account_details["institution_id"], + "status": account_details["status"], + "iban": account_details.get("iban", "N/A"), + "amount": amount, + "currency": balance_amount["currency"], + "type": balance["balanceType"], + "timestamp": datetime.now().timestamp(), + } + try: + persist_balance(ctx, account, balance_document) + except Exception as e: + error( + f"[{account}] Error: Sync failed, skipping account, exception: {e}" + ) + continue + info(f"Syncing transactions for {len(accounts)} accounts") for account in accounts: diff --git a/leggen/database/mongo.py b/leggen/database/mongo.py index f0cf0b5..4c84d3a 100644 --- a/leggen/database/mongo.py +++ b/leggen/database/mongo.py @@ -5,6 +5,25 @@ from pymongo.errors import DuplicateKeyError from leggen.utils.text import success, warning +def persist_balances(ctx: click.Context, balance: dict) -> None: + # Connect to MongoDB + mongo_uri = ctx.obj.get("database", {}).get("mongodb", {}).get("uri") + client = MongoClient(mongo_uri) + db = client["leggen"] + balances_collection = db["balances"] + + # Insert balance into MongoDB + try: + balances_collection.insert_one(balance) + success( + f"[{balance['account_id']}] Inserted new balance if type {balance['type']}" + ) + except DuplicateKeyError: + warning(f"[{balance['account_id']}] Skipped duplicate balance") + + client.close() + + def persist_transactions(ctx: click.Context, account: str, transactions: list) -> list: # Connect to MongoDB mongo_uri = ctx.obj.get("database", {}).get("mongodb", {}).get("uri") diff --git a/leggen/database/sqlite.py b/leggen/database/sqlite.py index 2c2f5cf..3c0a190 100644 --- a/leggen/database/sqlite.py +++ b/leggen/database/sqlite.py @@ -7,9 +7,63 @@ import click from leggen.utils.text import success, warning -def persist_transactions(ctx: click.Context, account: str, transactions: list) -> list: - # Path to your SQLite database file +def persist_balances(ctx: click.Context, balance: dict): + # Connect to SQLite database + conn = sqlite3.connect("./leggen.db") + cursor = conn.cursor() + # Create the balances table if it doesn't exist + cursor.execute( + """CREATE TABLE IF NOT EXISTS balances ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + account_id TEXT, + bank TEXT, + status TEXT, + iban TEXT, + amount REAL, + currency TEXT, + type TEXT, + timestamp DATETIME + )""" + ) + + # Insert balance into SQLite database + try: + cursor.execute( + """INSERT INTO balances ( + account_id, + bank, + status, + iban, + amount, + currency, + type, + timestamp + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", + ( + balance["account_id"], + balance["bank"], + balance["status"], + balance["iban"], + balance["amount"], + balance["currency"], + balance["type"], + balance["timestamp"], + ), + ) + except IntegrityError: + warning(f"[{balance['account_id']}] Skipped duplicate balance") + + # Commit changes and close the connection + conn.commit() + conn.close() + + success(f"[{balance['account_id']}] Inserted balance of type {balance['type']}") + + return balance + + +def persist_transactions(ctx: click.Context, account: str, transactions: list) -> list: # Connect to SQLite database conn = sqlite3.connect("./leggen.db") cursor = conn.cursor() diff --git a/leggen/utils/database.py b/leggen/utils/database.py index 56803f7..1945acf 100644 --- a/leggen/utils/database.py +++ b/leggen/utils/database.py @@ -8,6 +8,21 @@ from leggen.utils.network import get from leggen.utils.text import info, warning +def persist_balance(ctx: click.Context, account: str, balance: dict) -> None: + sqlite = ctx.obj.get("database", {}).get("sqlite", False) + mongodb = ctx.obj.get("database", {}).get("mongodb", False) + + if not sqlite and not mongodb: + warning("No database engine is enabled, skipping balance saving") + + if sqlite: + info(f"[{account}] Fetched balances, saving to SQLite") + sqlite_engine.persist_balances(ctx, balance) + else: + info(f"[{account}] Fetched balances, saving to MongoDB") + mongodb_engine.persist_balances(ctx, balance) + + def persist_transactions(ctx: click.Context, account: str, transactions: list) -> list: sqlite = ctx.obj.get("database", {}).get("sqlite", False) mongodb = ctx.obj.get("database", {}).get("mongodb", False)