From 7401ca62d2ff23c4100ed9d1c8b7450289337553 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Elisi=C3=A1rio=20Couto?= Date: Fri, 29 Mar 2024 02:36:22 +0000 Subject: [PATCH] feat(notifications): Add support for Telegram notifications. --- .gitignore | 1 + README.md | 15 +++- leggen/commands/sync.py | 107 +++---------------------- leggen/database/mongo.py | 35 +++++++++ leggen/{utils => database}/sqlite.py | 33 ++------ leggen/notifications/discord.py | 5 +- leggen/notifications/telegram.py | 43 ++++++++++ leggen/utils/database.py | 112 +++++++++++++++++++++++++++ leggen/utils/mongo.py | 57 -------------- leggen/utils/notifications.py | 46 +++++++++++ 10 files changed, 267 insertions(+), 187 deletions(-) create mode 100644 leggen/database/mongo.py rename leggen/{utils => database}/sqlite.py (62%) create mode 100644 leggen/notifications/telegram.py create mode 100644 leggen/utils/database.py delete mode 100644 leggen/utils/mongo.py create mode 100644 leggen/utils/notifications.py diff --git a/.gitignore b/.gitignore index 227657f..c9a966a 100644 --- a/.gitignore +++ b/.gitignore @@ -162,3 +162,4 @@ data/ docker-compose.dev.yml nocodb/ sql/ +leggen.db diff --git a/README.md b/README.md index 95bfa01..b47ca7d 100644 --- a/README.md +++ b/README.md @@ -26,10 +26,10 @@ Having your bank data in a database, gives you the power to backup, analyze and - List all connected banks and their statuses - List balances of all connected accounts - List transactions for all connected accounts - - Sync all transactions with a SQLite or MongoDB database + - Sync all transactions with a SQLite and/or MongoDB database - Visualize and query transactions using NocoDB - Schedule regular syncs with the database using Ofelia - - Send notifications to Discrod when transactions match certain filters + - Send notifications to Discord and/or Telegram when transactions match certain filters ## 🚀 Installation and Configuration @@ -49,12 +49,19 @@ url = "https://bankaccountdata.gocardless.com/api/v2" [database] sqlite = true +mongodb = true + +[database.mongodb] +uri = "mongodb://localhost:27017" [notifications.discord] webhook = "https://discord.com/api/webhooks/..." -[filters] -enabled = true +[notifications.telegram] +# See gist for telegram instructions +# https://gist.github.com/nafiesl/4ad622f344cd1dc3bb1ecbe468ff9f8a +token = "12345:abcdefghijklmnopqrstuvxwyz" +chat-id = 12345 [filters.case-insensitive] filter1 = "company-name" diff --git a/leggen/commands/sync.py b/leggen/commands/sync.py index 398b15a..1f9b3c0 100644 --- a/leggen/commands/sync.py +++ b/leggen/commands/sync.py @@ -1,107 +1,12 @@ -from datetime import datetime - import click from leggen.main import cli -from leggen.utils.mongo import save_transactions as save_transactions_mongo +from leggen.utils.database import save_transactions from leggen.utils.network import get -from leggen.utils.sqlite import save_transactions as save_transactions_sqlite +from leggen.utils.notifications import send_notification from leggen.utils.text import error, info -def save_transactions(ctx: click.Context, account: str): - info(f"[{account}] Getting account details") - account_info = get(ctx, f"/accounts/{account}") - - info(f"[{account}] Getting transactions") - transactions = [] - - account_transactions = get(ctx, f"/accounts/{account}/transactions/").get( - "transactions", [] - ) - - for transaction in account_transactions.get("booked", []): - booked_date = transaction.get("bookingDateTime") or transaction.get( - "bookingDate" - ) - value_date = transaction.get("valueDateTime") or transaction.get("valueDate") - if booked_date and value_date: - min_date = min( - datetime.fromisoformat(booked_date), datetime.fromisoformat(value_date) - ) - else: - min_date = datetime.fromisoformat(booked_date or value_date) - - transactionValue = float( - transaction.get("transactionAmount", {}).get("amount", 0) - ) - currency = transaction.get("transactionAmount", {}).get("currency", "") - - description = transaction.get( - "remittanceInformationUnstructured", - ",".join(transaction.get("remittanceInformationUnstructuredArray", [])), - ) - - t = { - "internalTransactionId": transaction.get("internalTransactionId"), - "institutionId": account_info["institution_id"], - "iban": account_info.get("iban", "N/A"), - "transactionDate": min_date, - "description": description, - "transactionValue": transactionValue, - "transactionCurrency": currency, - "transactionStatus": "booked", - "accountId": account, - "rawTransaction": transaction, - } - transactions.append(t) - - for transaction in account_transactions.get("pending", []): - booked_date = transaction.get("bookingDateTime") or transaction.get( - "bookingDate" - ) - value_date = transaction.get("valueDateTime") or transaction.get("valueDate") - if booked_date and value_date: - min_date = min( - datetime.fromisoformat(booked_date), datetime.fromisoformat(value_date) - ) - else: - min_date = datetime.fromisoformat(booked_date or value_date) - - transactionValue = float( - transaction.get("transactionAmount", {}).get("amount", 0) - ) - currency = transaction.get("transactionAmount", {}).get("currency", "") - - description = transaction.get( - "remittanceInformationUnstructured", - ",".join(transaction.get("remittanceInformationUnstructuredArray", [])), - ) - - t = { - "internalTransactionId": transaction.get("internalTransactionId"), - "institutionId": account_info["institution_id"], - "iban": account_info.get("iban", "N/A"), - "transactionDate": min_date, - "description": description, - "transactionValue": transactionValue, - "transactionCurrency": currency, - "transactionStatus": "pending", - "accountId": account, - "rawTransaction": transaction, - } - transactions.append(t) - - sqlite = ctx.obj.get("database", {}).get("sqlite", True) - info( - f"[{account}] Fetched {len(transactions)} transactions, saving to {'SQLite' if sqlite else 'MongoDB'}" - ) - if sqlite: - save_transactions_sqlite(ctx, account, transactions) - else: - save_transactions_mongo(ctx, account, transactions) - - @cli.command() @click.pass_context def sync(ctx: click.Context): @@ -118,6 +23,12 @@ def sync(ctx: click.Context): for account in accounts: try: - save_transactions(ctx, account) + new_transactions = save_transactions(ctx, account) except Exception as e: error(f"[{account}] Error: Sync failed, skipping account, exception: {e}") + continue + try: + send_notification(ctx, new_transactions) + except Exception as e: + error(f"[{account}] Error: Notification failed, exception: {e}") + continue diff --git a/leggen/database/mongo.py b/leggen/database/mongo.py new file mode 100644 index 0000000..f0cf0b5 --- /dev/null +++ b/leggen/database/mongo.py @@ -0,0 +1,35 @@ +import click +from pymongo import MongoClient +from pymongo.errors import DuplicateKeyError + +from leggen.utils.text import success, warning + + +def persist_transactions(ctx: click.Context, account: str, transactions: list) -> list: + # Connect to MongoDB + mongo_uri = ctx.obj.get("database", {}).get("mongodb", {}).get("uri") + client = MongoClient(mongo_uri) + db = client["leggen"] + transactions_collection = db["transactions"] + + # Create a unique index on internalTransactionId + transactions_collection.create_index("internalTransactionId", unique=True) + + # Insert transactions into MongoDB + duplicates_count = 0 + + new_transactions = [] + + for transaction in transactions: + try: + transactions_collection.insert_one(transaction) + new_transactions.append(transaction) + except DuplicateKeyError: + # A transaction with the same ID already exists, skip insertion + duplicates_count += 1 + + success(f"[{account}] Inserted {len(new_transactions)} new transactions") + if duplicates_count: + warning(f"[{account}] Skipped {duplicates_count} duplicate transactions") + + return new_transactions diff --git a/leggen/utils/sqlite.py b/leggen/database/sqlite.py similarity index 62% rename from leggen/utils/sqlite.py rename to leggen/database/sqlite.py index d930169..2c2f5cf 100644 --- a/leggen/utils/sqlite.py +++ b/leggen/database/sqlite.py @@ -4,11 +4,10 @@ from sqlite3 import IntegrityError import click -from leggen.notifications.discord import send_message from leggen.utils.text import success, warning -def save_transactions(ctx: click.Context, account: str, transactions: list): +def persist_transactions(ctx: click.Context, account: str, transactions: list) -> list: # Path to your SQLite database file # Connect to SQLite database @@ -32,7 +31,6 @@ def save_transactions(ctx: click.Context, account: str, transactions: list): ) # Insert transactions into SQLite database - new_transactions_count = 0 duplicates_count = 0 # Prepare an SQL statement for inserting data @@ -49,12 +47,7 @@ def save_transactions(ctx: click.Context, account: str, transactions: list): rawTransaction ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""" - notification_transactions = [] - filters_case_insensitive = {} - if ctx.obj.get("filters", {}).get("enabled", False): - filters_case_insensitive = ctx.obj.get("filters", {}).get( - "case-insensitive", {} - ) + new_transactions = [] for transaction in transactions: try: @@ -73,19 +66,7 @@ def save_transactions(ctx: click.Context, account: str, transactions: list): json.dumps(transaction["rawTransaction"]), ), ) - new_transactions_count += 1 - - # Add transaction to the list of transactions to be sent as a notification - for _, v in filters_case_insensitive.items(): - if v.lower() in transaction["description"].lower(): - notification_transactions.append( - { - "name": transaction["description"], - "value": transaction["transactionValue"], - "currency": transaction["transactionCurrency"], - "date": transaction["transactionDate"], - } - ) + new_transactions.append(transaction) except IntegrityError: # A transaction with the same ID already exists, indicating a duplicate duplicates_count += 1 @@ -94,10 +75,8 @@ def save_transactions(ctx: click.Context, account: str, transactions: list): conn.commit() conn.close() - # Send a notification with the transactions that match the filters - if notification_transactions: - send_message(ctx, notification_transactions) - - success(f"[{account}] Inserted {new_transactions_count} new transactions") + success(f"[{account}] Inserted {len(new_transactions)} new transactions") if duplicates_count: warning(f"[{account}] Skipped {duplicates_count} duplicate transactions") + + return new_transactions diff --git a/leggen/notifications/discord.py b/leggen/notifications/discord.py index 1107b6f..3799726 100644 --- a/leggen/notifications/discord.py +++ b/leggen/notifications/discord.py @@ -27,4 +27,7 @@ def send_message(ctx: click.Context, transactions: list): webhook.add_embed(embed) response = webhook.execute() - response.raise_for_status() + try: + response.raise_for_status() + except Exception as e: + raise Exception(f"Discord notification failed: {e}\n{response.text}") from e diff --git a/leggen/notifications/telegram.py b/leggen/notifications/telegram.py new file mode 100644 index 0000000..11ea3ee --- /dev/null +++ b/leggen/notifications/telegram.py @@ -0,0 +1,43 @@ +import click +import requests + +from leggen.utils.text import info + + +def escape_markdown(text: str) -> str: + return ( + str(text) + .replace("-", "\\-") + .replace("#", "\\#") + .replace(".", "\\.") + .replace("$", "\\$") + .replace("+", "\\+") + ) + + +def send_message(ctx: click.Context, transactions: list): + token = ctx.obj["notifications"]["telegram"]["api-key"] + chat_id = ctx.obj["notifications"]["telegram"]["chat-id"] + bot_url = f"https://api.telegram.org/bot{token}/sendMessage" + info(f"Got {len(transactions)} new transactions, sending message to Telegram") + message = "*💲 [Leggen](https://github.com/elisiariocouto/leggen)*\n" + message += f"{len(transactions)} new transaction matches\n\n" + + for transaction in transactions: + message += f"*Name*: {transaction['name']}\n" + message += f"*Value*: {transaction['value']}{transaction['currency']}\n" + message += f"*Date*: {transaction['date']}\n\n" + + res = requests.post( + bot_url, + json={ + "chat_id": chat_id, + "text": escape_markdown(message), + "parse_mode": "MarkdownV2", + }, + ) + + try: + res.raise_for_status() + except Exception as e: + raise Exception(f"Telegram notification failed: {e}\n{res.text}") from e diff --git a/leggen/utils/database.py b/leggen/utils/database.py new file mode 100644 index 0000000..56803f7 --- /dev/null +++ b/leggen/utils/database.py @@ -0,0 +1,112 @@ +from datetime import datetime + +import click + +import leggen.database.mongo as mongodb_engine +import leggen.database.sqlite as sqlite_engine +from leggen.utils.network import get +from leggen.utils.text import info, warning + + +def persist_transactions(ctx: click.Context, account: str, transactions: list) -> list: + sqlite = ctx.obj.get("database", {}).get("sqlite", False) + mongodb = ctx.obj.get("database", {}).get("mongodb", False) + + if not sqlite and not mongodb: + warning("No database engine is enabled, skipping transaction saving") + # WARNING: This will return the transactions list as is, without saving it to any database + # Possible duplicate notifications will be sent if the filters are enabled + return transactions + + if sqlite: + info(f"[{account}] Fetched {len(transactions)} transactions, saving to SQLite") + return sqlite_engine.persist_transactions(ctx, account, transactions) + else: + info(f"[{account}] Fetched {len(transactions)} transactions, saving to MongoDB") + return mongodb_engine.persist_transactions(ctx, account, transactions) + + +def save_transactions(ctx: click.Context, account: str) -> list: + info(f"[{account}] Getting account details") + account_info = get(ctx, f"/accounts/{account}") + + info(f"[{account}] Getting transactions") + transactions = [] + + account_transactions = get(ctx, f"/accounts/{account}/transactions/").get( + "transactions", [] + ) + + for transaction in account_transactions.get("booked", []): + booked_date = transaction.get("bookingDateTime") or transaction.get( + "bookingDate" + ) + value_date = transaction.get("valueDateTime") or transaction.get("valueDate") + if booked_date and value_date: + min_date = min( + datetime.fromisoformat(booked_date), datetime.fromisoformat(value_date) + ) + else: + min_date = datetime.fromisoformat(booked_date or value_date) + + transactionValue = float( + transaction.get("transactionAmount", {}).get("amount", 0) + ) + currency = transaction.get("transactionAmount", {}).get("currency", "") + + description = transaction.get( + "remittanceInformationUnstructured", + ",".join(transaction.get("remittanceInformationUnstructuredArray", [])), + ) + + t = { + "internalTransactionId": transaction.get("internalTransactionId"), + "institutionId": account_info["institution_id"], + "iban": account_info.get("iban", "N/A"), + "transactionDate": min_date, + "description": description, + "transactionValue": transactionValue, + "transactionCurrency": currency, + "transactionStatus": "booked", + "accountId": account, + "rawTransaction": transaction, + } + transactions.append(t) + + for transaction in account_transactions.get("pending", []): + booked_date = transaction.get("bookingDateTime") or transaction.get( + "bookingDate" + ) + value_date = transaction.get("valueDateTime") or transaction.get("valueDate") + if booked_date and value_date: + min_date = min( + datetime.fromisoformat(booked_date), datetime.fromisoformat(value_date) + ) + else: + min_date = datetime.fromisoformat(booked_date or value_date) + + transactionValue = float( + transaction.get("transactionAmount", {}).get("amount", 0) + ) + currency = transaction.get("transactionAmount", {}).get("currency", "") + + description = transaction.get( + "remittanceInformationUnstructured", + ",".join(transaction.get("remittanceInformationUnstructuredArray", [])), + ) + + t = { + "internalTransactionId": transaction.get("internalTransactionId"), + "institutionId": account_info["institution_id"], + "iban": account_info.get("iban", "N/A"), + "transactionDate": min_date, + "description": description, + "transactionValue": transactionValue, + "transactionCurrency": currency, + "transactionStatus": "pending", + "accountId": account, + "rawTransaction": transaction, + } + transactions.append(t) + + return persist_transactions(ctx, account, transactions) diff --git a/leggen/utils/mongo.py b/leggen/utils/mongo.py deleted file mode 100644 index 556aec4..0000000 --- a/leggen/utils/mongo.py +++ /dev/null @@ -1,57 +0,0 @@ -import click -from pymongo import MongoClient -from pymongo.errors import DuplicateKeyError - -from leggen.notifications.discord import send_message -from leggen.utils.text import success, warning - - -def save_transactions(ctx: click.Context, account: str, transactions: list): - # Connect to MongoDB - mongo_uri = ctx.obj.get("database", {}).get("mongodb", {}).get("uri") - client = MongoClient(mongo_uri) - db = client["leggen"] - transactions_collection = db["transactions"] - - # Create a unique index on internalTransactionId - transactions_collection.create_index("internalTransactionId", unique=True) - - # Insert transactions into MongoDB - new_transactions_count = 0 - duplicates_count = 0 - - notification_transactions = [] - filters_case_insensitive = {} - if ctx.obj.get("filters", {}).get("enabled", False): - filters_case_insensitive = ctx.obj.get("filters", {}).get( - "case-insensitive", {} - ) - - for transaction in transactions: - try: - transactions_collection.insert_one(transaction) - new_transactions_count += 1 - - # Add transaction to the list of transactions to be sent as a notification - for _, v in filters_case_insensitive.items(): - if v.lower() in transaction["description"].lower(): - notification_transactions.append( - { - "name": transaction["description"], - "value": transaction["transactionValue"], - "currency": transaction["transactionCurrency"], - "date": transaction["transactionDate"], - } - ) - - except DuplicateKeyError: - # A transaction with the same ID already exists, skip insertion - duplicates_count += 1 - - # Send a notification with the transactions that match the filters - if notification_transactions: - send_message(ctx, notification_transactions) - - success(f"[{account}] Inserted {new_transactions_count} new transactions") - if duplicates_count: - warning(f"[{account}] Skipped {duplicates_count} duplicate transactions") diff --git a/leggen/utils/notifications.py b/leggen/utils/notifications.py new file mode 100644 index 0000000..cf79a64 --- /dev/null +++ b/leggen/utils/notifications.py @@ -0,0 +1,46 @@ +import click + +import leggen.notifications.discord as discord +import leggen.notifications.telegram as telegram +from leggen.utils.text import info, warning + + +def send_notification(ctx: click.Context, transactions: list): + if ctx.obj.get("filters") is None: + warning("No filters are enabled, skipping notifications") + return + + filters_case_insensitive = ctx.obj.get("filters", {}).get("case-insensitive", {}) + + # Add transaction to the list of transactions to be sent as a notification + notification_transactions = [] + for transaction in transactions: + for _, v in filters_case_insensitive.items(): + if v.lower() in transaction["description"].lower(): + notification_transactions.append( + { + "name": transaction["description"], + "value": transaction["transactionValue"], + "currency": transaction["transactionCurrency"], + "date": transaction["transactionDate"], + } + ) + + if len(notification_transactions) == 0: + warning("No transactions matched the filters, skipping notifications") + return + + discord_enabled = ctx.obj.get("notifications", {}).get("discord", False) + telegram_enabled = ctx.obj.get("notifications", {}).get("telegram", False) + + if not discord_enabled and not telegram_enabled: + warning("No notification engine is enabled, skipping notifications") + return + + if discord_enabled: + info(f"Sending {len(notification_transactions)} transactions to Discord") + discord.send_message(ctx, notification_transactions) + + if telegram_enabled: + info(f"Sending {len(notification_transactions)} transactions to Telegram") + telegram.send_message(ctx, notification_transactions)