mirror of
https://github.com/elisiariocouto/leggen.git
synced 2025-12-13 11:22:21 +00:00
feat(notifications): Add support for Telegram notifications.
This commit is contained in:
committed by
Elisiário Couto
parent
e46634cf27
commit
7401ca62d2
1
.gitignore
vendored
1
.gitignore
vendored
@@ -162,3 +162,4 @@ data/
|
||||
docker-compose.dev.yml
|
||||
nocodb/
|
||||
sql/
|
||||
leggen.db
|
||||
|
||||
15
README.md
15
README.md
@@ -26,10 +26,10 @@ Having your bank data in a database, gives you the power to backup, analyze and
|
||||
- List all connected banks and their statuses
|
||||
- List balances of all connected accounts
|
||||
- List transactions for all connected accounts
|
||||
- Sync all transactions with a SQLite or MongoDB database
|
||||
- Sync all transactions with a SQLite and/or MongoDB database
|
||||
- Visualize and query transactions using NocoDB
|
||||
- Schedule regular syncs with the database using Ofelia
|
||||
- Send notifications to Discrod when transactions match certain filters
|
||||
- Send notifications to Discord and/or Telegram when transactions match certain filters
|
||||
|
||||
## 🚀 Installation and Configuration
|
||||
|
||||
@@ -49,12 +49,19 @@ url = "https://bankaccountdata.gocardless.com/api/v2"
|
||||
|
||||
[database]
|
||||
sqlite = true
|
||||
mongodb = true
|
||||
|
||||
[database.mongodb]
|
||||
uri = "mongodb://localhost:27017"
|
||||
|
||||
[notifications.discord]
|
||||
webhook = "https://discord.com/api/webhooks/..."
|
||||
|
||||
[filters]
|
||||
enabled = true
|
||||
[notifications.telegram]
|
||||
# See gist for telegram instructions
|
||||
# https://gist.github.com/nafiesl/4ad622f344cd1dc3bb1ecbe468ff9f8a
|
||||
token = "12345:abcdefghijklmnopqrstuvxwyz"
|
||||
chat-id = 12345
|
||||
|
||||
[filters.case-insensitive]
|
||||
filter1 = "company-name"
|
||||
|
||||
@@ -1,107 +1,12 @@
|
||||
from datetime import datetime
|
||||
|
||||
import click
|
||||
|
||||
from leggen.main import cli
|
||||
from leggen.utils.mongo import save_transactions as save_transactions_mongo
|
||||
from leggen.utils.database import save_transactions
|
||||
from leggen.utils.network import get
|
||||
from leggen.utils.sqlite import save_transactions as save_transactions_sqlite
|
||||
from leggen.utils.notifications import send_notification
|
||||
from leggen.utils.text import error, info
|
||||
|
||||
|
||||
def save_transactions(ctx: click.Context, account: str):
|
||||
info(f"[{account}] Getting account details")
|
||||
account_info = get(ctx, f"/accounts/{account}")
|
||||
|
||||
info(f"[{account}] Getting transactions")
|
||||
transactions = []
|
||||
|
||||
account_transactions = get(ctx, f"/accounts/{account}/transactions/").get(
|
||||
"transactions", []
|
||||
)
|
||||
|
||||
for transaction in account_transactions.get("booked", []):
|
||||
booked_date = transaction.get("bookingDateTime") or transaction.get(
|
||||
"bookingDate"
|
||||
)
|
||||
value_date = transaction.get("valueDateTime") or transaction.get("valueDate")
|
||||
if booked_date and value_date:
|
||||
min_date = min(
|
||||
datetime.fromisoformat(booked_date), datetime.fromisoformat(value_date)
|
||||
)
|
||||
else:
|
||||
min_date = datetime.fromisoformat(booked_date or value_date)
|
||||
|
||||
transactionValue = float(
|
||||
transaction.get("transactionAmount", {}).get("amount", 0)
|
||||
)
|
||||
currency = transaction.get("transactionAmount", {}).get("currency", "")
|
||||
|
||||
description = transaction.get(
|
||||
"remittanceInformationUnstructured",
|
||||
",".join(transaction.get("remittanceInformationUnstructuredArray", [])),
|
||||
)
|
||||
|
||||
t = {
|
||||
"internalTransactionId": transaction.get("internalTransactionId"),
|
||||
"institutionId": account_info["institution_id"],
|
||||
"iban": account_info.get("iban", "N/A"),
|
||||
"transactionDate": min_date,
|
||||
"description": description,
|
||||
"transactionValue": transactionValue,
|
||||
"transactionCurrency": currency,
|
||||
"transactionStatus": "booked",
|
||||
"accountId": account,
|
||||
"rawTransaction": transaction,
|
||||
}
|
||||
transactions.append(t)
|
||||
|
||||
for transaction in account_transactions.get("pending", []):
|
||||
booked_date = transaction.get("bookingDateTime") or transaction.get(
|
||||
"bookingDate"
|
||||
)
|
||||
value_date = transaction.get("valueDateTime") or transaction.get("valueDate")
|
||||
if booked_date and value_date:
|
||||
min_date = min(
|
||||
datetime.fromisoformat(booked_date), datetime.fromisoformat(value_date)
|
||||
)
|
||||
else:
|
||||
min_date = datetime.fromisoformat(booked_date or value_date)
|
||||
|
||||
transactionValue = float(
|
||||
transaction.get("transactionAmount", {}).get("amount", 0)
|
||||
)
|
||||
currency = transaction.get("transactionAmount", {}).get("currency", "")
|
||||
|
||||
description = transaction.get(
|
||||
"remittanceInformationUnstructured",
|
||||
",".join(transaction.get("remittanceInformationUnstructuredArray", [])),
|
||||
)
|
||||
|
||||
t = {
|
||||
"internalTransactionId": transaction.get("internalTransactionId"),
|
||||
"institutionId": account_info["institution_id"],
|
||||
"iban": account_info.get("iban", "N/A"),
|
||||
"transactionDate": min_date,
|
||||
"description": description,
|
||||
"transactionValue": transactionValue,
|
||||
"transactionCurrency": currency,
|
||||
"transactionStatus": "pending",
|
||||
"accountId": account,
|
||||
"rawTransaction": transaction,
|
||||
}
|
||||
transactions.append(t)
|
||||
|
||||
sqlite = ctx.obj.get("database", {}).get("sqlite", True)
|
||||
info(
|
||||
f"[{account}] Fetched {len(transactions)} transactions, saving to {'SQLite' if sqlite else 'MongoDB'}"
|
||||
)
|
||||
if sqlite:
|
||||
save_transactions_sqlite(ctx, account, transactions)
|
||||
else:
|
||||
save_transactions_mongo(ctx, account, transactions)
|
||||
|
||||
|
||||
@cli.command()
|
||||
@click.pass_context
|
||||
def sync(ctx: click.Context):
|
||||
@@ -118,6 +23,12 @@ def sync(ctx: click.Context):
|
||||
|
||||
for account in accounts:
|
||||
try:
|
||||
save_transactions(ctx, account)
|
||||
new_transactions = save_transactions(ctx, account)
|
||||
except Exception as e:
|
||||
error(f"[{account}] Error: Sync failed, skipping account, exception: {e}")
|
||||
continue
|
||||
try:
|
||||
send_notification(ctx, new_transactions)
|
||||
except Exception as e:
|
||||
error(f"[{account}] Error: Notification failed, exception: {e}")
|
||||
continue
|
||||
|
||||
35
leggen/database/mongo.py
Normal file
35
leggen/database/mongo.py
Normal file
@@ -0,0 +1,35 @@
|
||||
import click
|
||||
from pymongo import MongoClient
|
||||
from pymongo.errors import DuplicateKeyError
|
||||
|
||||
from leggen.utils.text import success, warning
|
||||
|
||||
|
||||
def persist_transactions(ctx: click.Context, account: str, transactions: list) -> list:
|
||||
# Connect to MongoDB
|
||||
mongo_uri = ctx.obj.get("database", {}).get("mongodb", {}).get("uri")
|
||||
client = MongoClient(mongo_uri)
|
||||
db = client["leggen"]
|
||||
transactions_collection = db["transactions"]
|
||||
|
||||
# Create a unique index on internalTransactionId
|
||||
transactions_collection.create_index("internalTransactionId", unique=True)
|
||||
|
||||
# Insert transactions into MongoDB
|
||||
duplicates_count = 0
|
||||
|
||||
new_transactions = []
|
||||
|
||||
for transaction in transactions:
|
||||
try:
|
||||
transactions_collection.insert_one(transaction)
|
||||
new_transactions.append(transaction)
|
||||
except DuplicateKeyError:
|
||||
# A transaction with the same ID already exists, skip insertion
|
||||
duplicates_count += 1
|
||||
|
||||
success(f"[{account}] Inserted {len(new_transactions)} new transactions")
|
||||
if duplicates_count:
|
||||
warning(f"[{account}] Skipped {duplicates_count} duplicate transactions")
|
||||
|
||||
return new_transactions
|
||||
@@ -4,11 +4,10 @@ from sqlite3 import IntegrityError
|
||||
|
||||
import click
|
||||
|
||||
from leggen.notifications.discord import send_message
|
||||
from leggen.utils.text import success, warning
|
||||
|
||||
|
||||
def save_transactions(ctx: click.Context, account: str, transactions: list):
|
||||
def persist_transactions(ctx: click.Context, account: str, transactions: list) -> list:
|
||||
# Path to your SQLite database file
|
||||
|
||||
# Connect to SQLite database
|
||||
@@ -32,7 +31,6 @@ def save_transactions(ctx: click.Context, account: str, transactions: list):
|
||||
)
|
||||
|
||||
# Insert transactions into SQLite database
|
||||
new_transactions_count = 0
|
||||
duplicates_count = 0
|
||||
|
||||
# Prepare an SQL statement for inserting data
|
||||
@@ -49,12 +47,7 @@ def save_transactions(ctx: click.Context, account: str, transactions: list):
|
||||
rawTransaction
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"""
|
||||
|
||||
notification_transactions = []
|
||||
filters_case_insensitive = {}
|
||||
if ctx.obj.get("filters", {}).get("enabled", False):
|
||||
filters_case_insensitive = ctx.obj.get("filters", {}).get(
|
||||
"case-insensitive", {}
|
||||
)
|
||||
new_transactions = []
|
||||
|
||||
for transaction in transactions:
|
||||
try:
|
||||
@@ -73,19 +66,7 @@ def save_transactions(ctx: click.Context, account: str, transactions: list):
|
||||
json.dumps(transaction["rawTransaction"]),
|
||||
),
|
||||
)
|
||||
new_transactions_count += 1
|
||||
|
||||
# Add transaction to the list of transactions to be sent as a notification
|
||||
for _, v in filters_case_insensitive.items():
|
||||
if v.lower() in transaction["description"].lower():
|
||||
notification_transactions.append(
|
||||
{
|
||||
"name": transaction["description"],
|
||||
"value": transaction["transactionValue"],
|
||||
"currency": transaction["transactionCurrency"],
|
||||
"date": transaction["transactionDate"],
|
||||
}
|
||||
)
|
||||
new_transactions.append(transaction)
|
||||
except IntegrityError:
|
||||
# A transaction with the same ID already exists, indicating a duplicate
|
||||
duplicates_count += 1
|
||||
@@ -94,10 +75,8 @@ def save_transactions(ctx: click.Context, account: str, transactions: list):
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
# Send a notification with the transactions that match the filters
|
||||
if notification_transactions:
|
||||
send_message(ctx, notification_transactions)
|
||||
|
||||
success(f"[{account}] Inserted {new_transactions_count} new transactions")
|
||||
success(f"[{account}] Inserted {len(new_transactions)} new transactions")
|
||||
if duplicates_count:
|
||||
warning(f"[{account}] Skipped {duplicates_count} duplicate transactions")
|
||||
|
||||
return new_transactions
|
||||
@@ -27,4 +27,7 @@ def send_message(ctx: click.Context, transactions: list):
|
||||
|
||||
webhook.add_embed(embed)
|
||||
response = webhook.execute()
|
||||
try:
|
||||
response.raise_for_status()
|
||||
except Exception as e:
|
||||
raise Exception(f"Discord notification failed: {e}\n{response.text}") from e
|
||||
|
||||
43
leggen/notifications/telegram.py
Normal file
43
leggen/notifications/telegram.py
Normal file
@@ -0,0 +1,43 @@
|
||||
import click
|
||||
import requests
|
||||
|
||||
from leggen.utils.text import info
|
||||
|
||||
|
||||
def escape_markdown(text: str) -> str:
|
||||
return (
|
||||
str(text)
|
||||
.replace("-", "\\-")
|
||||
.replace("#", "\\#")
|
||||
.replace(".", "\\.")
|
||||
.replace("$", "\\$")
|
||||
.replace("+", "\\+")
|
||||
)
|
||||
|
||||
|
||||
def send_message(ctx: click.Context, transactions: list):
|
||||
token = ctx.obj["notifications"]["telegram"]["api-key"]
|
||||
chat_id = ctx.obj["notifications"]["telegram"]["chat-id"]
|
||||
bot_url = f"https://api.telegram.org/bot{token}/sendMessage"
|
||||
info(f"Got {len(transactions)} new transactions, sending message to Telegram")
|
||||
message = "*💲 [Leggen](https://github.com/elisiariocouto/leggen)*\n"
|
||||
message += f"{len(transactions)} new transaction matches\n\n"
|
||||
|
||||
for transaction in transactions:
|
||||
message += f"*Name*: {transaction['name']}\n"
|
||||
message += f"*Value*: {transaction['value']}{transaction['currency']}\n"
|
||||
message += f"*Date*: {transaction['date']}\n\n"
|
||||
|
||||
res = requests.post(
|
||||
bot_url,
|
||||
json={
|
||||
"chat_id": chat_id,
|
||||
"text": escape_markdown(message),
|
||||
"parse_mode": "MarkdownV2",
|
||||
},
|
||||
)
|
||||
|
||||
try:
|
||||
res.raise_for_status()
|
||||
except Exception as e:
|
||||
raise Exception(f"Telegram notification failed: {e}\n{res.text}") from e
|
||||
112
leggen/utils/database.py
Normal file
112
leggen/utils/database.py
Normal file
@@ -0,0 +1,112 @@
|
||||
from datetime import datetime
|
||||
|
||||
import click
|
||||
|
||||
import leggen.database.mongo as mongodb_engine
|
||||
import leggen.database.sqlite as sqlite_engine
|
||||
from leggen.utils.network import get
|
||||
from leggen.utils.text import info, warning
|
||||
|
||||
|
||||
def persist_transactions(ctx: click.Context, account: str, transactions: list) -> list:
|
||||
sqlite = ctx.obj.get("database", {}).get("sqlite", False)
|
||||
mongodb = ctx.obj.get("database", {}).get("mongodb", False)
|
||||
|
||||
if not sqlite and not mongodb:
|
||||
warning("No database engine is enabled, skipping transaction saving")
|
||||
# WARNING: This will return the transactions list as is, without saving it to any database
|
||||
# Possible duplicate notifications will be sent if the filters are enabled
|
||||
return transactions
|
||||
|
||||
if sqlite:
|
||||
info(f"[{account}] Fetched {len(transactions)} transactions, saving to SQLite")
|
||||
return sqlite_engine.persist_transactions(ctx, account, transactions)
|
||||
else:
|
||||
info(f"[{account}] Fetched {len(transactions)} transactions, saving to MongoDB")
|
||||
return mongodb_engine.persist_transactions(ctx, account, transactions)
|
||||
|
||||
|
||||
def save_transactions(ctx: click.Context, account: str) -> list:
|
||||
info(f"[{account}] Getting account details")
|
||||
account_info = get(ctx, f"/accounts/{account}")
|
||||
|
||||
info(f"[{account}] Getting transactions")
|
||||
transactions = []
|
||||
|
||||
account_transactions = get(ctx, f"/accounts/{account}/transactions/").get(
|
||||
"transactions", []
|
||||
)
|
||||
|
||||
for transaction in account_transactions.get("booked", []):
|
||||
booked_date = transaction.get("bookingDateTime") or transaction.get(
|
||||
"bookingDate"
|
||||
)
|
||||
value_date = transaction.get("valueDateTime") or transaction.get("valueDate")
|
||||
if booked_date and value_date:
|
||||
min_date = min(
|
||||
datetime.fromisoformat(booked_date), datetime.fromisoformat(value_date)
|
||||
)
|
||||
else:
|
||||
min_date = datetime.fromisoformat(booked_date or value_date)
|
||||
|
||||
transactionValue = float(
|
||||
transaction.get("transactionAmount", {}).get("amount", 0)
|
||||
)
|
||||
currency = transaction.get("transactionAmount", {}).get("currency", "")
|
||||
|
||||
description = transaction.get(
|
||||
"remittanceInformationUnstructured",
|
||||
",".join(transaction.get("remittanceInformationUnstructuredArray", [])),
|
||||
)
|
||||
|
||||
t = {
|
||||
"internalTransactionId": transaction.get("internalTransactionId"),
|
||||
"institutionId": account_info["institution_id"],
|
||||
"iban": account_info.get("iban", "N/A"),
|
||||
"transactionDate": min_date,
|
||||
"description": description,
|
||||
"transactionValue": transactionValue,
|
||||
"transactionCurrency": currency,
|
||||
"transactionStatus": "booked",
|
||||
"accountId": account,
|
||||
"rawTransaction": transaction,
|
||||
}
|
||||
transactions.append(t)
|
||||
|
||||
for transaction in account_transactions.get("pending", []):
|
||||
booked_date = transaction.get("bookingDateTime") or transaction.get(
|
||||
"bookingDate"
|
||||
)
|
||||
value_date = transaction.get("valueDateTime") or transaction.get("valueDate")
|
||||
if booked_date and value_date:
|
||||
min_date = min(
|
||||
datetime.fromisoformat(booked_date), datetime.fromisoformat(value_date)
|
||||
)
|
||||
else:
|
||||
min_date = datetime.fromisoformat(booked_date or value_date)
|
||||
|
||||
transactionValue = float(
|
||||
transaction.get("transactionAmount", {}).get("amount", 0)
|
||||
)
|
||||
currency = transaction.get("transactionAmount", {}).get("currency", "")
|
||||
|
||||
description = transaction.get(
|
||||
"remittanceInformationUnstructured",
|
||||
",".join(transaction.get("remittanceInformationUnstructuredArray", [])),
|
||||
)
|
||||
|
||||
t = {
|
||||
"internalTransactionId": transaction.get("internalTransactionId"),
|
||||
"institutionId": account_info["institution_id"],
|
||||
"iban": account_info.get("iban", "N/A"),
|
||||
"transactionDate": min_date,
|
||||
"description": description,
|
||||
"transactionValue": transactionValue,
|
||||
"transactionCurrency": currency,
|
||||
"transactionStatus": "pending",
|
||||
"accountId": account,
|
||||
"rawTransaction": transaction,
|
||||
}
|
||||
transactions.append(t)
|
||||
|
||||
return persist_transactions(ctx, account, transactions)
|
||||
@@ -1,57 +0,0 @@
|
||||
import click
|
||||
from pymongo import MongoClient
|
||||
from pymongo.errors import DuplicateKeyError
|
||||
|
||||
from leggen.notifications.discord import send_message
|
||||
from leggen.utils.text import success, warning
|
||||
|
||||
|
||||
def save_transactions(ctx: click.Context, account: str, transactions: list):
|
||||
# Connect to MongoDB
|
||||
mongo_uri = ctx.obj.get("database", {}).get("mongodb", {}).get("uri")
|
||||
client = MongoClient(mongo_uri)
|
||||
db = client["leggen"]
|
||||
transactions_collection = db["transactions"]
|
||||
|
||||
# Create a unique index on internalTransactionId
|
||||
transactions_collection.create_index("internalTransactionId", unique=True)
|
||||
|
||||
# Insert transactions into MongoDB
|
||||
new_transactions_count = 0
|
||||
duplicates_count = 0
|
||||
|
||||
notification_transactions = []
|
||||
filters_case_insensitive = {}
|
||||
if ctx.obj.get("filters", {}).get("enabled", False):
|
||||
filters_case_insensitive = ctx.obj.get("filters", {}).get(
|
||||
"case-insensitive", {}
|
||||
)
|
||||
|
||||
for transaction in transactions:
|
||||
try:
|
||||
transactions_collection.insert_one(transaction)
|
||||
new_transactions_count += 1
|
||||
|
||||
# Add transaction to the list of transactions to be sent as a notification
|
||||
for _, v in filters_case_insensitive.items():
|
||||
if v.lower() in transaction["description"].lower():
|
||||
notification_transactions.append(
|
||||
{
|
||||
"name": transaction["description"],
|
||||
"value": transaction["transactionValue"],
|
||||
"currency": transaction["transactionCurrency"],
|
||||
"date": transaction["transactionDate"],
|
||||
}
|
||||
)
|
||||
|
||||
except DuplicateKeyError:
|
||||
# A transaction with the same ID already exists, skip insertion
|
||||
duplicates_count += 1
|
||||
|
||||
# Send a notification with the transactions that match the filters
|
||||
if notification_transactions:
|
||||
send_message(ctx, notification_transactions)
|
||||
|
||||
success(f"[{account}] Inserted {new_transactions_count} new transactions")
|
||||
if duplicates_count:
|
||||
warning(f"[{account}] Skipped {duplicates_count} duplicate transactions")
|
||||
46
leggen/utils/notifications.py
Normal file
46
leggen/utils/notifications.py
Normal file
@@ -0,0 +1,46 @@
|
||||
import click
|
||||
|
||||
import leggen.notifications.discord as discord
|
||||
import leggen.notifications.telegram as telegram
|
||||
from leggen.utils.text import info, warning
|
||||
|
||||
|
||||
def send_notification(ctx: click.Context, transactions: list):
|
||||
if ctx.obj.get("filters") is None:
|
||||
warning("No filters are enabled, skipping notifications")
|
||||
return
|
||||
|
||||
filters_case_insensitive = ctx.obj.get("filters", {}).get("case-insensitive", {})
|
||||
|
||||
# Add transaction to the list of transactions to be sent as a notification
|
||||
notification_transactions = []
|
||||
for transaction in transactions:
|
||||
for _, v in filters_case_insensitive.items():
|
||||
if v.lower() in transaction["description"].lower():
|
||||
notification_transactions.append(
|
||||
{
|
||||
"name": transaction["description"],
|
||||
"value": transaction["transactionValue"],
|
||||
"currency": transaction["transactionCurrency"],
|
||||
"date": transaction["transactionDate"],
|
||||
}
|
||||
)
|
||||
|
||||
if len(notification_transactions) == 0:
|
||||
warning("No transactions matched the filters, skipping notifications")
|
||||
return
|
||||
|
||||
discord_enabled = ctx.obj.get("notifications", {}).get("discord", False)
|
||||
telegram_enabled = ctx.obj.get("notifications", {}).get("telegram", False)
|
||||
|
||||
if not discord_enabled and not telegram_enabled:
|
||||
warning("No notification engine is enabled, skipping notifications")
|
||||
return
|
||||
|
||||
if discord_enabled:
|
||||
info(f"Sending {len(notification_transactions)} transactions to Discord")
|
||||
discord.send_message(ctx, notification_transactions)
|
||||
|
||||
if telegram_enabled:
|
||||
info(f"Sending {len(notification_transactions)} transactions to Telegram")
|
||||
telegram.send_message(ctx, notification_transactions)
|
||||
Reference in New Issue
Block a user