feat: Change default database engine to SQLite, change schema.

This commit is contained in:
Elisiário Couto
2024-02-27 00:29:19 +00:00
parent 433d17371e
commit f9ab3ae0a8
9 changed files with 288 additions and 67 deletions

View File

@@ -7,7 +7,11 @@ from leggen.utils.config import save_config
@cli.command()
@click.option(
"--api-key", prompt=True, help="GoCardless API Key", envvar="LEGGEN_GC_API_KEY"
"--api-key",
prompt=True,
help="GoCardless API Key",
envvar="LEGGEN_GC_API_KEY",
show_envvar=True,
)
@click.option(
"--api-secret",
@@ -15,6 +19,7 @@ from leggen.utils.config import save_config
help="GoCardless API Secret",
hide_input=True,
envvar="LEGGEN_GC_API_SECRET",
show_envvar=True,
)
@click.option(
"--api-url",
@@ -22,10 +27,32 @@ from leggen.utils.config import save_config
help="GoCardless API URL",
show_default=True,
envvar="LEGGEN_GC_API_URL",
show_envvar=True,
)
@click.option(
"--sqlite/--mongo",
prompt=True,
default=True,
help="Use SQLite or MongoDB",
show_default=True,
)
@click.option(
"--mongo-uri",
prompt=True,
help="MongoDB URI",
envvar="LEGGEN_MONGO_URI",
show_envvar=True,
default="mongodb://localhost:27017",
)
@click.option("--mongo-uri", prompt=True, help="MongoDB URI", envvar="LEGGEN_MONGO_URI")
@click.pass_context
def init(ctx: click.Context, api_key, api_secret, api_url, mongo_uri):
def init(
ctx: click.Context,
api_key: str,
api_secret: str,
api_url: str,
sqlite: bool,
mongo_uri: str,
):
"""
Create configuration file
"""
@@ -33,6 +60,7 @@ def init(ctx: click.Context, api_key, api_secret, api_url, mongo_uri):
"api_key": api_key,
"api_secret": api_secret,
"api_url": api_url,
"sqlite": sqlite,
"mongo_uri": mongo_uri,
}

View File

@@ -1,55 +1,99 @@
from datetime import datetime
import click
from pymongo import MongoClient
from pymongo.errors import DuplicateKeyError
from leggen.main import cli
from leggen.utils.mongo import save_transactions as save_transactions_mongo
from leggen.utils.network import get
from leggen.utils.text import error, info, success, warning
from leggen.utils.sqlite import save_transactions as save_transactions_sqlite
from leggen.utils.text import error, info
def save_transactions(ctx: click.Context, account: str):
info(f"[{account}] Getting account details")
account_info = get(ctx, f"/accounts/{account}")
info(f"[{account}] Getting transactions")
all_transactions = []
transactions = []
account_transactions = get(ctx, f"/accounts/{account}/transactions/").get(
"transactions", []
)
for transaction in account_transactions.get("booked", []):
transaction["accountId"] = account
transaction["transactionStatus"] = "booked"
all_transactions.append(transaction)
booked_date = datetime.fromisoformat(
transaction.get("bookingDateTime", transaction.get("bookingDate"))
)
value_date = datetime.fromisoformat(
transaction.get("valueDateTime", transaction.get("valueDate"))
)
min_date = min(booked_date, value_date)
transactionValue = float(
transaction.get("transactionAmount", {}).get("amount", 0)
)
currency = transaction.get("transactionAmount", {}).get("currency", "")
description = transaction.get(
"remittanceInformationUnstructured",
",".join(transaction.get("remittanceInformationUnstructuredArray", [])),
)
t = {
"internalTransactionId": transaction.get("internalTransactionId"),
"institutionId": account_info["institution_id"],
"iban": account_info.get("iban", "N/A"),
"transactionDate": min_date,
"description": description,
"transactionValue": transactionValue,
"transactionCurrency": currency,
"transactionStatus": "booked",
"accountId": account,
"rawTransaction": transaction,
}
transactions.append(t)
for transaction in account_transactions.get("pending", []):
transaction["accountId"] = account
transaction["transactionStatus"] = "pending"
all_transactions.append(transaction)
booked_date = datetime.fromisoformat(
transaction.get("bookingDateTime", transaction.get("bookingDate"))
)
value_date = datetime.fromisoformat(
transaction.get("valueDateTime", transaction.get("valueDate"))
)
min_date = min(booked_date, value_date)
info(f"[{account}] Fetched {len(all_transactions)} transactions, saving to MongoDB")
transactionValue = float(
transaction.get("transactionAmount", {}).get("amount", 0)
)
currency = transaction.get("transactionAmount", {}).get("currency", "")
# Connect to MongoDB
mongo_uri = ctx.obj["mongo_uri"]
client = MongoClient(mongo_uri)
db = client["leggen"]
transactions_collection = db["transactions"]
description = transaction.get(
"remittanceInformationUnstructured",
",".join(transaction.get("remittanceInformationUnstructuredArray", [])),
)
# Create a unique index on transactionId
transactions_collection.create_index("transactionId", unique=True)
t = {
"internalTransactionId": transaction.get("internalTransactionId"),
"institutionId": account_info["institution_id"],
"iban": account_info.get("iban", "N/A"),
"transactionDate": min_date,
"description": description,
"transactionValue": transactionValue,
"transactionCurrency": currency,
"transactionStatus": "pending",
"accountId": account,
"rawTransaction": transaction,
}
transactions.append(t)
# Insert transactions into MongoDB
new_transactions_count = 0
duplicates_count = 0
for transaction in all_transactions:
try:
transactions_collection.insert_one(transaction)
new_transactions_count += 1
except DuplicateKeyError:
# A transaction with the same ID already exists, skip insertion
duplicates_count += 1
success(f"[{account}] Inserted {new_transactions_count} new transactions")
if duplicates_count:
warning(f"[{account}] Skipped {duplicates_count} duplicate transactions")
sqlite = ctx.obj["sqlite"]
info(
f"[{account}] Fetched {len(transactions)} transactions, saving to {'SQLite' if sqlite else 'MongoDB'}"
)
if sqlite:
save_transactions_sqlite(ctx, account, transactions)
else:
save_transactions_mongo(ctx, account, transactions)
@cli.command()

View File

@@ -2,23 +2,15 @@ import click
from leggen.main import cli
from leggen.utils.network import get
from leggen.utils.text import print_table
from leggen.utils.text import info, print_table
@cli.command()
@click.argument("account", type=str)
@click.pass_context
def transactions(ctx: click.Context, account: str):
"""
List transactions for an account
ACCOUNT is the account id, see 'leggen status' for the account ids
"""
def print_transactions(
ctx: click.Context, account_info: dict, account_transactions: dict
):
info(f"Bank: {account_info['institution_id']}")
info(f"IBAN: {account_info.get('iban', 'N/A')}")
all_transactions = []
account_transactions = get(ctx, f"/accounts/{account}/transactions/").get(
"transactions", []
)
for transaction in account_transactions.get("booked", []):
transaction["TYPE"] = "booked"
all_transactions.append(transaction)
@@ -28,3 +20,33 @@ def transactions(ctx: click.Context, account: str):
all_transactions.append(transaction)
print_table(all_transactions)
@cli.command()
@click.option("-a", "--account", type=str, help="Account ID")
@click.pass_context
def transactions(ctx: click.Context, account: str):
"""
List transactions
By default, this command lists all transactions for all accounts.
If the --account option is used, it will only list transactions for that account.
"""
if account:
account_info = get(ctx, f"/accounts/{account}")
account_transactions = get(ctx, f"/accounts/{account}/transactions/").get(
"transactions", []
)
print_transactions(ctx, account_info, account_transactions)
else:
res = get(ctx, "/requisitions/")
accounts = []
for r in res["results"]:
accounts += r.get("accounts", [])
for account in accounts:
account_details = get(ctx, f"/accounts/{account}")
account_transactions = get(ctx, f"/accounts/{account}/transactions/").get(
"transactions", []
)
print_transactions(ctx, account_details, account_transactions)

View File

@@ -98,5 +98,6 @@ def cli(ctx: click.Context):
config = load_config()
token = get_token(config)
ctx.obj["api_url"] = config["api_url"]
ctx.obj["sqlite"] = config["sqlite"]
ctx.obj["mongo_uri"] = config["mongo_uri"]
ctx.obj["headers"] = {"Authorization": f"Bearer {token}"}

32
leggen/utils/mongo.py Normal file
View File

@@ -0,0 +1,32 @@
import click
from pymongo import MongoClient
from pymongo.errors import DuplicateKeyError
from leggen.utils.text import success, warning
def save_transactions(ctx: click.Context, account: str, transactions: list):
# Connect to MongoDB
mongo_uri = ctx.obj["mongo_uri"]
client = MongoClient(mongo_uri)
db = client["leggen"]
transactions_collection = db["transactions"]
# Create a unique index on internalTransactionId
transactions_collection.create_index("internalTransactionId", unique=True)
# Insert transactions into MongoDB
new_transactions_count = 0
duplicates_count = 0
for transaction in transactions:
try:
transactions_collection.insert_one(transaction)
new_transactions_count += 1
except DuplicateKeyError:
# A transaction with the same ID already exists, skip insertion
duplicates_count += 1
success(f"[{account}] Inserted {new_transactions_count} new transactions")
if duplicates_count:
warning(f"[{account}] Skipped {duplicates_count} duplicate transactions")

80
leggen/utils/sqlite.py Normal file
View File

@@ -0,0 +1,80 @@
import json
import sqlite3
from sqlite3 import IntegrityError
import click
from leggen.utils.text import success, warning
def save_transactions(ctx: click.Context, account: str, transactions: list):
# Path to your SQLite database file
# Connect to SQLite database
conn = sqlite3.connect("./leggen.db")
cursor = conn.cursor()
# Create the transactions table if it doesn't exist
cursor.execute(
"""CREATE TABLE IF NOT EXISTS transactions (
internalTransactionId TEXT PRIMARY KEY,
institutionId TEXT,
iban TEXT,
transactionDate DATETIME,
description TEXT,
transactionValue REAL,
transactionCurrency TEXT,
transactionStatus TEXT,
accountId TEXT,
rawTransaction JSON
)"""
)
# Insert transactions into SQLite database
new_transactions_count = 0
duplicates_count = 0
# Prepare an SQL statement for inserting data
insert_sql = """INSERT INTO transactions (
internalTransactionId,
institutionId,
iban,
transactionDate,
description,
transactionValue,
transactionCurrency,
transactionStatus,
accountId,
rawTransaction
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"""
for transaction in transactions:
try:
cursor.execute(
insert_sql,
(
transaction["internalTransactionId"],
transaction["institutionId"],
transaction["iban"],
transaction["transactionDate"],
transaction["description"],
transaction["transactionValue"],
transaction["transactionCurrency"],
transaction["transactionStatus"],
transaction["accountId"],
json.dumps(transaction["rawTransaction"]),
),
)
new_transactions_count += 1
except IntegrityError:
# A transaction with the same ID already exists, indicating a duplicate
duplicates_count += 1
# Commit changes and close the connection
conn.commit()
conn.close()
success(f"[{account}] Inserted {new_transactions_count} new transactions")
if duplicates_count:
warning(f"[{account}] Skipped {duplicates_count} duplicate transactions")