Compare commits

...

3 Commits
0.4.0 ... 0.5.0

Author SHA1 Message Date
Elisiário Couto
798a8f1880 chore(ci): Bump version to 0.5.0 2024-03-29 16:57:33 +00:00
Elisiário Couto
7401ca62d2 feat(notifications): Add support for Telegram notifications. 2024-03-29 16:56:45 +00:00
Elisiário Couto
e46634cf27 chore: Rename docker-compose.yml to compose.yml and remove obsolete 'version' key. 2024-03-28 16:09:54 +00:00
13 changed files with 281 additions and 191 deletions

1
.gitignore vendored
View File

@@ -162,3 +162,4 @@ data/
docker-compose.dev.yml docker-compose.dev.yml
nocodb/ nocodb/
sql/ sql/
leggen.db

View File

@@ -1,3 +1,15 @@
## 0.5.0 (2024/03/29)
### Features
- **notifications:** Add support for Telegram notifications. ([7401ca62](https://github.com/elisiariocouto/leggen/commit/7401ca62d2ff23c4100ed9d1c8b7450289337553))
### Miscellaneous Tasks
- Rename docker-compose.yml to compose.yml and remove obsolete 'version' key. ([e46634cf](https://github.com/elisiariocouto/leggen/commit/e46634cf27046bfc8d638a0cd4910a4a8a42648a))
## 0.4.0 (2024/03/28) ## 0.4.0 (2024/03/28)
### Features ### Features

View File

@@ -26,16 +26,16 @@ Having your bank data in a database, gives you the power to backup, analyze and
- List all connected banks and their statuses - List all connected banks and their statuses
- List balances of all connected accounts - List balances of all connected accounts
- List transactions for all connected accounts - List transactions for all connected accounts
- Sync all transactions with a SQLite or MongoDB database - Sync all transactions with a SQLite and/or MongoDB database
- Visualize and query transactions using NocoDB - Visualize and query transactions using NocoDB
- Schedule regular syncs with the database using Ofelia - Schedule regular syncs with the database using Ofelia
- Send notifications to Discrod when transactions match certain filters - Send notifications to Discord and/or Telegram when transactions match certain filters
## 🚀 Installation and Configuration ## 🚀 Installation and Configuration
In order to use `leggen`, you need to create a GoCardless account. GoCardless is a service that provides access to Open Banking APIs. You can create an account at https://gocardless.com/bank-account-data/. In order to use `leggen`, you need to create a GoCardless account. GoCardless is a service that provides access to Open Banking APIs. You can create an account at https://gocardless.com/bank-account-data/.
After creating an account and getting your API keys, the best way is to use the [compose file](docker-compose.yml). Open the file and adapt it to your needs. After creating an account and getting your API keys, the best way is to use the [compose file](compose.yml). Open the file and adapt it to your needs.
### Example Configuration ### Example Configuration
@@ -49,12 +49,19 @@ url = "https://bankaccountdata.gocardless.com/api/v2"
[database] [database]
sqlite = true sqlite = true
mongodb = true
[database.mongodb]
uri = "mongodb://localhost:27017"
[notifications.discord] [notifications.discord]
webhook = "https://discord.com/api/webhooks/..." webhook = "https://discord.com/api/webhooks/..."
[filters] [notifications.telegram]
enabled = true # See gist for telegram instructions
# https://gist.github.com/nafiesl/4ad622f344cd1dc3bb1ecbe468ff9f8a
token = "12345:abcdefghijklmnopqrstuvxwyz"
chat-id = 12345
[filters.case-insensitive] [filters.case-insensitive]
filter1 = "company-name" filter1 = "company-name"

View File

@@ -1,5 +1,3 @@
version: '3.1'
services: services:
# Defaults to `sync` command. # Defaults to `sync` command.
leggen: leggen:

View File

@@ -1,107 +1,12 @@
from datetime import datetime
import click import click
from leggen.main import cli from leggen.main import cli
from leggen.utils.mongo import save_transactions as save_transactions_mongo from leggen.utils.database import save_transactions
from leggen.utils.network import get from leggen.utils.network import get
from leggen.utils.sqlite import save_transactions as save_transactions_sqlite from leggen.utils.notifications import send_notification
from leggen.utils.text import error, info from leggen.utils.text import error, info
def save_transactions(ctx: click.Context, account: str):
info(f"[{account}] Getting account details")
account_info = get(ctx, f"/accounts/{account}")
info(f"[{account}] Getting transactions")
transactions = []
account_transactions = get(ctx, f"/accounts/{account}/transactions/").get(
"transactions", []
)
for transaction in account_transactions.get("booked", []):
booked_date = transaction.get("bookingDateTime") or transaction.get(
"bookingDate"
)
value_date = transaction.get("valueDateTime") or transaction.get("valueDate")
if booked_date and value_date:
min_date = min(
datetime.fromisoformat(booked_date), datetime.fromisoformat(value_date)
)
else:
min_date = datetime.fromisoformat(booked_date or value_date)
transactionValue = float(
transaction.get("transactionAmount", {}).get("amount", 0)
)
currency = transaction.get("transactionAmount", {}).get("currency", "")
description = transaction.get(
"remittanceInformationUnstructured",
",".join(transaction.get("remittanceInformationUnstructuredArray", [])),
)
t = {
"internalTransactionId": transaction.get("internalTransactionId"),
"institutionId": account_info["institution_id"],
"iban": account_info.get("iban", "N/A"),
"transactionDate": min_date,
"description": description,
"transactionValue": transactionValue,
"transactionCurrency": currency,
"transactionStatus": "booked",
"accountId": account,
"rawTransaction": transaction,
}
transactions.append(t)
for transaction in account_transactions.get("pending", []):
booked_date = transaction.get("bookingDateTime") or transaction.get(
"bookingDate"
)
value_date = transaction.get("valueDateTime") or transaction.get("valueDate")
if booked_date and value_date:
min_date = min(
datetime.fromisoformat(booked_date), datetime.fromisoformat(value_date)
)
else:
min_date = datetime.fromisoformat(booked_date or value_date)
transactionValue = float(
transaction.get("transactionAmount", {}).get("amount", 0)
)
currency = transaction.get("transactionAmount", {}).get("currency", "")
description = transaction.get(
"remittanceInformationUnstructured",
",".join(transaction.get("remittanceInformationUnstructuredArray", [])),
)
t = {
"internalTransactionId": transaction.get("internalTransactionId"),
"institutionId": account_info["institution_id"],
"iban": account_info.get("iban", "N/A"),
"transactionDate": min_date,
"description": description,
"transactionValue": transactionValue,
"transactionCurrency": currency,
"transactionStatus": "pending",
"accountId": account,
"rawTransaction": transaction,
}
transactions.append(t)
sqlite = ctx.obj.get("database", {}).get("sqlite", True)
info(
f"[{account}] Fetched {len(transactions)} transactions, saving to {'SQLite' if sqlite else 'MongoDB'}"
)
if sqlite:
save_transactions_sqlite(ctx, account, transactions)
else:
save_transactions_mongo(ctx, account, transactions)
@cli.command() @cli.command()
@click.pass_context @click.pass_context
def sync(ctx: click.Context): def sync(ctx: click.Context):
@@ -118,6 +23,12 @@ def sync(ctx: click.Context):
for account in accounts: for account in accounts:
try: try:
save_transactions(ctx, account) new_transactions = save_transactions(ctx, account)
except Exception as e: except Exception as e:
error(f"[{account}] Error: Sync failed, skipping account, exception: {e}") error(f"[{account}] Error: Sync failed, skipping account, exception: {e}")
continue
try:
send_notification(ctx, new_transactions)
except Exception as e:
error(f"[{account}] Error: Notification failed, exception: {e}")
continue

35
leggen/database/mongo.py Normal file
View File

@@ -0,0 +1,35 @@
import click
from pymongo import MongoClient
from pymongo.errors import DuplicateKeyError
from leggen.utils.text import success, warning
def persist_transactions(ctx: click.Context, account: str, transactions: list) -> list:
# Connect to MongoDB
mongo_uri = ctx.obj.get("database", {}).get("mongodb", {}).get("uri")
client = MongoClient(mongo_uri)
db = client["leggen"]
transactions_collection = db["transactions"]
# Create a unique index on internalTransactionId
transactions_collection.create_index("internalTransactionId", unique=True)
# Insert transactions into MongoDB
duplicates_count = 0
new_transactions = []
for transaction in transactions:
try:
transactions_collection.insert_one(transaction)
new_transactions.append(transaction)
except DuplicateKeyError:
# A transaction with the same ID already exists, skip insertion
duplicates_count += 1
success(f"[{account}] Inserted {len(new_transactions)} new transactions")
if duplicates_count:
warning(f"[{account}] Skipped {duplicates_count} duplicate transactions")
return new_transactions

View File

@@ -4,11 +4,10 @@ from sqlite3 import IntegrityError
import click import click
from leggen.notifications.discord import send_message
from leggen.utils.text import success, warning from leggen.utils.text import success, warning
def save_transactions(ctx: click.Context, account: str, transactions: list): def persist_transactions(ctx: click.Context, account: str, transactions: list) -> list:
# Path to your SQLite database file # Path to your SQLite database file
# Connect to SQLite database # Connect to SQLite database
@@ -32,7 +31,6 @@ def save_transactions(ctx: click.Context, account: str, transactions: list):
) )
# Insert transactions into SQLite database # Insert transactions into SQLite database
new_transactions_count = 0
duplicates_count = 0 duplicates_count = 0
# Prepare an SQL statement for inserting data # Prepare an SQL statement for inserting data
@@ -49,12 +47,7 @@ def save_transactions(ctx: click.Context, account: str, transactions: list):
rawTransaction rawTransaction
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""" ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"""
notification_transactions = [] new_transactions = []
filters_case_insensitive = {}
if ctx.obj.get("filters", {}).get("enabled", False):
filters_case_insensitive = ctx.obj.get("filters", {}).get(
"case-insensitive", {}
)
for transaction in transactions: for transaction in transactions:
try: try:
@@ -73,19 +66,7 @@ def save_transactions(ctx: click.Context, account: str, transactions: list):
json.dumps(transaction["rawTransaction"]), json.dumps(transaction["rawTransaction"]),
), ),
) )
new_transactions_count += 1 new_transactions.append(transaction)
# Add transaction to the list of transactions to be sent as a notification
for _, v in filters_case_insensitive.items():
if v.lower() in transaction["description"].lower():
notification_transactions.append(
{
"name": transaction["description"],
"value": transaction["transactionValue"],
"currency": transaction["transactionCurrency"],
"date": transaction["transactionDate"],
}
)
except IntegrityError: except IntegrityError:
# A transaction with the same ID already exists, indicating a duplicate # A transaction with the same ID already exists, indicating a duplicate
duplicates_count += 1 duplicates_count += 1
@@ -94,10 +75,8 @@ def save_transactions(ctx: click.Context, account: str, transactions: list):
conn.commit() conn.commit()
conn.close() conn.close()
# Send a notification with the transactions that match the filters success(f"[{account}] Inserted {len(new_transactions)} new transactions")
if notification_transactions:
send_message(ctx, notification_transactions)
success(f"[{account}] Inserted {new_transactions_count} new transactions")
if duplicates_count: if duplicates_count:
warning(f"[{account}] Skipped {duplicates_count} duplicate transactions") warning(f"[{account}] Skipped {duplicates_count} duplicate transactions")
return new_transactions

View File

@@ -27,4 +27,7 @@ def send_message(ctx: click.Context, transactions: list):
webhook.add_embed(embed) webhook.add_embed(embed)
response = webhook.execute() response = webhook.execute()
response.raise_for_status() try:
response.raise_for_status()
except Exception as e:
raise Exception(f"Discord notification failed: {e}\n{response.text}") from e

View File

@@ -0,0 +1,43 @@
import click
import requests
from leggen.utils.text import info
def escape_markdown(text: str) -> str:
return (
str(text)
.replace("-", "\\-")
.replace("#", "\\#")
.replace(".", "\\.")
.replace("$", "\\$")
.replace("+", "\\+")
)
def send_message(ctx: click.Context, transactions: list):
token = ctx.obj["notifications"]["telegram"]["api-key"]
chat_id = ctx.obj["notifications"]["telegram"]["chat-id"]
bot_url = f"https://api.telegram.org/bot{token}/sendMessage"
info(f"Got {len(transactions)} new transactions, sending message to Telegram")
message = "*💲 [Leggen](https://github.com/elisiariocouto/leggen)*\n"
message += f"{len(transactions)} new transaction matches\n\n"
for transaction in transactions:
message += f"*Name*: {transaction['name']}\n"
message += f"*Value*: {transaction['value']}{transaction['currency']}\n"
message += f"*Date*: {transaction['date']}\n\n"
res = requests.post(
bot_url,
json={
"chat_id": chat_id,
"text": escape_markdown(message),
"parse_mode": "MarkdownV2",
},
)
try:
res.raise_for_status()
except Exception as e:
raise Exception(f"Telegram notification failed: {e}\n{res.text}") from e

112
leggen/utils/database.py Normal file
View File

@@ -0,0 +1,112 @@
from datetime import datetime
import click
import leggen.database.mongo as mongodb_engine
import leggen.database.sqlite as sqlite_engine
from leggen.utils.network import get
from leggen.utils.text import info, warning
def persist_transactions(ctx: click.Context, account: str, transactions: list) -> list:
sqlite = ctx.obj.get("database", {}).get("sqlite", False)
mongodb = ctx.obj.get("database", {}).get("mongodb", False)
if not sqlite and not mongodb:
warning("No database engine is enabled, skipping transaction saving")
# WARNING: This will return the transactions list as is, without saving it to any database
# Possible duplicate notifications will be sent if the filters are enabled
return transactions
if sqlite:
info(f"[{account}] Fetched {len(transactions)} transactions, saving to SQLite")
return sqlite_engine.persist_transactions(ctx, account, transactions)
else:
info(f"[{account}] Fetched {len(transactions)} transactions, saving to MongoDB")
return mongodb_engine.persist_transactions(ctx, account, transactions)
def save_transactions(ctx: click.Context, account: str) -> list:
info(f"[{account}] Getting account details")
account_info = get(ctx, f"/accounts/{account}")
info(f"[{account}] Getting transactions")
transactions = []
account_transactions = get(ctx, f"/accounts/{account}/transactions/").get(
"transactions", []
)
for transaction in account_transactions.get("booked", []):
booked_date = transaction.get("bookingDateTime") or transaction.get(
"bookingDate"
)
value_date = transaction.get("valueDateTime") or transaction.get("valueDate")
if booked_date and value_date:
min_date = min(
datetime.fromisoformat(booked_date), datetime.fromisoformat(value_date)
)
else:
min_date = datetime.fromisoformat(booked_date or value_date)
transactionValue = float(
transaction.get("transactionAmount", {}).get("amount", 0)
)
currency = transaction.get("transactionAmount", {}).get("currency", "")
description = transaction.get(
"remittanceInformationUnstructured",
",".join(transaction.get("remittanceInformationUnstructuredArray", [])),
)
t = {
"internalTransactionId": transaction.get("internalTransactionId"),
"institutionId": account_info["institution_id"],
"iban": account_info.get("iban", "N/A"),
"transactionDate": min_date,
"description": description,
"transactionValue": transactionValue,
"transactionCurrency": currency,
"transactionStatus": "booked",
"accountId": account,
"rawTransaction": transaction,
}
transactions.append(t)
for transaction in account_transactions.get("pending", []):
booked_date = transaction.get("bookingDateTime") or transaction.get(
"bookingDate"
)
value_date = transaction.get("valueDateTime") or transaction.get("valueDate")
if booked_date and value_date:
min_date = min(
datetime.fromisoformat(booked_date), datetime.fromisoformat(value_date)
)
else:
min_date = datetime.fromisoformat(booked_date or value_date)
transactionValue = float(
transaction.get("transactionAmount", {}).get("amount", 0)
)
currency = transaction.get("transactionAmount", {}).get("currency", "")
description = transaction.get(
"remittanceInformationUnstructured",
",".join(transaction.get("remittanceInformationUnstructuredArray", [])),
)
t = {
"internalTransactionId": transaction.get("internalTransactionId"),
"institutionId": account_info["institution_id"],
"iban": account_info.get("iban", "N/A"),
"transactionDate": min_date,
"description": description,
"transactionValue": transactionValue,
"transactionCurrency": currency,
"transactionStatus": "pending",
"accountId": account,
"rawTransaction": transaction,
}
transactions.append(t)
return persist_transactions(ctx, account, transactions)

View File

@@ -1,57 +0,0 @@
import click
from pymongo import MongoClient
from pymongo.errors import DuplicateKeyError
from leggen.notifications.discord import send_message
from leggen.utils.text import success, warning
def save_transactions(ctx: click.Context, account: str, transactions: list):
# Connect to MongoDB
mongo_uri = ctx.obj.get("database", {}).get("mongodb", {}).get("uri")
client = MongoClient(mongo_uri)
db = client["leggen"]
transactions_collection = db["transactions"]
# Create a unique index on internalTransactionId
transactions_collection.create_index("internalTransactionId", unique=True)
# Insert transactions into MongoDB
new_transactions_count = 0
duplicates_count = 0
notification_transactions = []
filters_case_insensitive = {}
if ctx.obj.get("filters", {}).get("enabled", False):
filters_case_insensitive = ctx.obj.get("filters", {}).get(
"case-insensitive", {}
)
for transaction in transactions:
try:
transactions_collection.insert_one(transaction)
new_transactions_count += 1
# Add transaction to the list of transactions to be sent as a notification
for _, v in filters_case_insensitive.items():
if v.lower() in transaction["description"].lower():
notification_transactions.append(
{
"name": transaction["description"],
"value": transaction["transactionValue"],
"currency": transaction["transactionCurrency"],
"date": transaction["transactionDate"],
}
)
except DuplicateKeyError:
# A transaction with the same ID already exists, skip insertion
duplicates_count += 1
# Send a notification with the transactions that match the filters
if notification_transactions:
send_message(ctx, notification_transactions)
success(f"[{account}] Inserted {new_transactions_count} new transactions")
if duplicates_count:
warning(f"[{account}] Skipped {duplicates_count} duplicate transactions")

View File

@@ -0,0 +1,46 @@
import click
import leggen.notifications.discord as discord
import leggen.notifications.telegram as telegram
from leggen.utils.text import info, warning
def send_notification(ctx: click.Context, transactions: list):
if ctx.obj.get("filters") is None:
warning("No filters are enabled, skipping notifications")
return
filters_case_insensitive = ctx.obj.get("filters", {}).get("case-insensitive", {})
# Add transaction to the list of transactions to be sent as a notification
notification_transactions = []
for transaction in transactions:
for _, v in filters_case_insensitive.items():
if v.lower() in transaction["description"].lower():
notification_transactions.append(
{
"name": transaction["description"],
"value": transaction["transactionValue"],
"currency": transaction["transactionCurrency"],
"date": transaction["transactionDate"],
}
)
if len(notification_transactions) == 0:
warning("No transactions matched the filters, skipping notifications")
return
discord_enabled = ctx.obj.get("notifications", {}).get("discord", False)
telegram_enabled = ctx.obj.get("notifications", {}).get("telegram", False)
if not discord_enabled and not telegram_enabled:
warning("No notification engine is enabled, skipping notifications")
return
if discord_enabled:
info(f"Sending {len(notification_transactions)} transactions to Discord")
discord.send_message(ctx, notification_transactions)
if telegram_enabled:
info(f"Sending {len(notification_transactions)} transactions to Telegram")
telegram.send_message(ctx, notification_transactions)

View File

@@ -1,6 +1,6 @@
[tool.poetry] [tool.poetry]
name = "leggen" name = "leggen"
version = "0.4.0" version = "0.5.0"
description = "An Open Banking CLI" description = "An Open Banking CLI"
authors = ["Elisiário Couto <elisiario@couto.io>"] authors = ["Elisiário Couto <elisiario@couto.io>"]
readme = "README.md" readme = "README.md"