mirror of
https://github.com/elisiariocouto/leggen.git
synced 2025-12-24 22:59:28 +00:00
chore: Initial version.
This commit is contained in:
47
leggen/commands/balances.py
Normal file
47
leggen/commands/balances.py
Normal file
@@ -0,0 +1,47 @@
|
||||
import click
|
||||
|
||||
from leggen.main import cli
|
||||
from leggen.utils.network import get
|
||||
from leggen.utils.text import datefmt, print_table
|
||||
|
||||
|
||||
@cli.command()
|
||||
@click.pass_context
|
||||
def balances(ctx: click.Context):
|
||||
"""
|
||||
List balances of all connected accounts
|
||||
"""
|
||||
|
||||
res = get(ctx, "/requisitions/")
|
||||
accounts = []
|
||||
for r in res.get("results", []):
|
||||
accounts += r.get("accounts", [])
|
||||
|
||||
all_balances = []
|
||||
for account in accounts:
|
||||
account_ballances = get(ctx, f"/accounts/{account}/balances/").get(
|
||||
"balances", []
|
||||
)
|
||||
for balance in account_ballances:
|
||||
balance_amount = balance["balanceAmount"]
|
||||
amount = round(float(balance_amount["amount"]), 2)
|
||||
symbol = (
|
||||
"€"
|
||||
if balance_amount["currency"] == "EUR"
|
||||
else f" {balance_amount['currency']}"
|
||||
)
|
||||
amount_str = f"{amount}{symbol}"
|
||||
date = (
|
||||
datefmt(balance.get("lastChangeDateTime"))
|
||||
if balance.get("lastChangeDateTime")
|
||||
else ""
|
||||
)
|
||||
all_balances.append(
|
||||
{
|
||||
"Account": account,
|
||||
"Amount": amount_str,
|
||||
"Type": balance["balanceType"],
|
||||
"Last change at": date,
|
||||
}
|
||||
)
|
||||
print_table(all_balances)
|
||||
36
leggen/commands/bank/__init__.py
Normal file
36
leggen/commands/bank/__init__.py
Normal file
@@ -0,0 +1,36 @@
|
||||
import os
|
||||
|
||||
import click
|
||||
|
||||
from leggen.main import cli
|
||||
|
||||
cmd_folder = os.path.abspath(os.path.dirname(__file__))
|
||||
|
||||
|
||||
class BankGroup(click.Group):
|
||||
def list_commands(self, ctx):
|
||||
rv = []
|
||||
for filename in os.listdir(cmd_folder):
|
||||
if filename.endswith(".py") and not filename.startswith("__init__"):
|
||||
if filename == "list_banks.py":
|
||||
rv.append("list")
|
||||
else:
|
||||
rv.append(filename[:-3])
|
||||
rv.sort()
|
||||
return rv
|
||||
|
||||
def get_command(self, ctx, name):
|
||||
try:
|
||||
if name == "list":
|
||||
name = "list_banks"
|
||||
mod = __import__(f"leggen.commands.bank.{name}", None, None, [name])
|
||||
except ImportError:
|
||||
return
|
||||
return getattr(mod, name)
|
||||
|
||||
|
||||
@cli.group(cls=BankGroup)
|
||||
@click.pass_context
|
||||
def bank(ctx):
|
||||
"""Manage banks connections"""
|
||||
return
|
||||
45
leggen/commands/bank/add.py
Normal file
45
leggen/commands/bank/add.py
Normal file
@@ -0,0 +1,45 @@
|
||||
import click
|
||||
|
||||
from leggen.main import cli
|
||||
from leggen.utils.disk import save_file
|
||||
from leggen.utils.network import get, post
|
||||
from leggen.utils.text import info, print_table, warning
|
||||
|
||||
|
||||
@cli.command()
|
||||
@click.pass_context
|
||||
def add(ctx):
|
||||
"""
|
||||
Connect to a bank
|
||||
"""
|
||||
country = click.prompt(
|
||||
"Bank Country",
|
||||
type=click.Choice(["PT", "GB"], case_sensitive=True),
|
||||
default="PT",
|
||||
)
|
||||
info(f"Getting bank list for country: {country}")
|
||||
banks = get(ctx, "/institutions/", {"country": country})
|
||||
filtered_banks = [
|
||||
{
|
||||
"id": bank["id"],
|
||||
"name": bank["name"],
|
||||
"max_transaction_days": bank["transaction_total_days"],
|
||||
}
|
||||
for bank in banks
|
||||
]
|
||||
print_table(filtered_banks)
|
||||
allowed_ids = [str(bank["id"]) for bank in banks]
|
||||
bank_id = click.prompt("Bank ID", type=click.Choice(allowed_ids))
|
||||
click.confirm("Do you agree to connect to this bank?", abort=True)
|
||||
|
||||
info(f"Connecting to bank with ID: {bank_id}")
|
||||
|
||||
res = post(
|
||||
ctx,
|
||||
"/requisitions/",
|
||||
{"institution_id": bank_id, "redirect": "http://localhost:8000/"},
|
||||
)
|
||||
|
||||
save_file(f"req_{res['id']}.json", res)
|
||||
|
||||
warning(f"Please open the following URL in your browser to accept: {res['link']}")
|
||||
44
leggen/commands/init.py
Normal file
44
leggen/commands/init.py
Normal file
@@ -0,0 +1,44 @@
|
||||
import click
|
||||
|
||||
from leggen.main import cli
|
||||
from leggen.utils.auth import get_token
|
||||
from leggen.utils.config import save_config
|
||||
|
||||
|
||||
@cli.command()
|
||||
@click.option(
|
||||
"--api-key", prompt=True, help="GoCardless API Key", envvar="LEGGEN_GC_API_KEY"
|
||||
)
|
||||
@click.option(
|
||||
"--api-secret",
|
||||
prompt=True,
|
||||
help="GoCardless API Secret",
|
||||
hide_input=True,
|
||||
envvar="LEGGEN_GC_API_SECRET",
|
||||
)
|
||||
@click.option(
|
||||
"--api-url",
|
||||
default="https://bankaccountdata.gocardless.com/api/v2",
|
||||
help="GoCardless API URL",
|
||||
show_default=True,
|
||||
envvar="LEGGEN_GC_API_URL",
|
||||
)
|
||||
@click.option("--mongo-uri", prompt=True, help="MongoDB URI", envvar="LEGGEN_MONGO_URI")
|
||||
@click.pass_context
|
||||
def init(ctx: click.Context, api_key, api_secret, api_url, mongo_uri):
|
||||
"""
|
||||
Create configuration file
|
||||
"""
|
||||
config = {
|
||||
"api_key": api_key,
|
||||
"api_secret": api_secret,
|
||||
"api_url": api_url,
|
||||
"mongo_uri": mongo_uri,
|
||||
}
|
||||
|
||||
# Just make sure this API credentials are valid
|
||||
# if so, it will save the token in the auth file
|
||||
_ = get_token(config)
|
||||
|
||||
# Save the configuration
|
||||
save_config(config)
|
||||
46
leggen/commands/status.py
Normal file
46
leggen/commands/status.py
Normal file
@@ -0,0 +1,46 @@
|
||||
import click
|
||||
|
||||
from leggen.main import cli
|
||||
from leggen.utils.gocardless import REQUISITION_STATUS
|
||||
from leggen.utils.network import get
|
||||
from leggen.utils.text import datefmt, echo, info, print_table
|
||||
|
||||
|
||||
@cli.command()
|
||||
@click.pass_context
|
||||
def status(ctx: click.Context):
|
||||
"""
|
||||
List all connected banks and their status
|
||||
"""
|
||||
|
||||
res = get(ctx, "/requisitions/")
|
||||
requisitions = []
|
||||
accounts = []
|
||||
for r in res["results"]:
|
||||
requisitions.append(
|
||||
{
|
||||
"Bank": r["institution_id"],
|
||||
"Status": REQUISITION_STATUS.get(r["status"], "UNKNOWN"),
|
||||
"Created at": datefmt(r["created"]),
|
||||
}
|
||||
)
|
||||
accounts += r.get("accounts", [])
|
||||
info("Banks")
|
||||
print_table(requisitions)
|
||||
|
||||
account_details = []
|
||||
for account in accounts:
|
||||
details = get(ctx, f"/accounts/{account}")
|
||||
account_details.append(
|
||||
{
|
||||
"ID": details["id"],
|
||||
"Bank": details["institution_id"],
|
||||
"Status": details["status"],
|
||||
"IBAN": details.get("iban", "N/A"),
|
||||
"Created at": datefmt(details["created"]),
|
||||
"Last accessed at": datefmt(details["last_accessed"]),
|
||||
}
|
||||
)
|
||||
echo()
|
||||
info("Accounts")
|
||||
print_table(account_details)
|
||||
74
leggen/commands/sync.py
Normal file
74
leggen/commands/sync.py
Normal file
@@ -0,0 +1,74 @@
|
||||
import click
|
||||
from pymongo import MongoClient
|
||||
from pymongo.errors import DuplicateKeyError
|
||||
|
||||
from leggen.main import cli
|
||||
from leggen.utils.network import get
|
||||
from leggen.utils.text import error, info, success, warning
|
||||
|
||||
|
||||
def save_transactions(ctx: click.Context, account: str):
|
||||
info(f"[{account}] Getting transactions")
|
||||
all_transactions = []
|
||||
account_transactions = get(ctx, f"/accounts/{account}/transactions/").get(
|
||||
"transactions", []
|
||||
)
|
||||
|
||||
for transaction in account_transactions.get("booked", []):
|
||||
transaction["accountId"] = account
|
||||
transaction["transactionStatus"] = "booked"
|
||||
all_transactions.append(transaction)
|
||||
|
||||
for transaction in account_transactions.get("pending", []):
|
||||
transaction["accountId"] = account
|
||||
transaction["transactionStatus"] = "pending"
|
||||
all_transactions.append(transaction)
|
||||
|
||||
info(f"[{account}] Fetched {len(all_transactions)} transactions, saving to MongoDB")
|
||||
|
||||
# Connect to MongoDB
|
||||
mongo_uri = ctx.obj["mongo_uri"]
|
||||
client = MongoClient(mongo_uri)
|
||||
db = client["leggen"]
|
||||
transactions_collection = db["transactions"]
|
||||
|
||||
# Create a unique index on transactionId
|
||||
transactions_collection.create_index("transactionId", unique=True)
|
||||
|
||||
# Insert transactions into MongoDB
|
||||
new_transactions_count = 0
|
||||
duplicates_count = 0
|
||||
|
||||
for transaction in all_transactions:
|
||||
try:
|
||||
transactions_collection.insert_one(transaction)
|
||||
new_transactions_count += 1
|
||||
except DuplicateKeyError:
|
||||
# A transaction with the same ID already exists, skip insertion
|
||||
duplicates_count += 1
|
||||
|
||||
success(f"[{account}] Inserted {new_transactions_count} new transactions")
|
||||
if duplicates_count:
|
||||
warning(f"[{account}] Skipped {duplicates_count} duplicate transactions")
|
||||
|
||||
|
||||
@cli.command()
|
||||
@click.pass_context
|
||||
def sync(ctx: click.Context):
|
||||
"""
|
||||
Sync all transactions with database
|
||||
"""
|
||||
info("Getting accounts details")
|
||||
res = get(ctx, "/requisitions/")
|
||||
accounts = []
|
||||
for r in res.get("results", []):
|
||||
accounts += r.get("accounts", [])
|
||||
accounts = list(set(accounts))
|
||||
|
||||
info(f"Syncing transactions for {len(accounts)} accounts")
|
||||
|
||||
for account in accounts:
|
||||
try:
|
||||
save_transactions(ctx, account)
|
||||
except Exception as e:
|
||||
error(f"[{account}] Error: Sync failed, skipping account. Exception: {e}")
|
||||
30
leggen/commands/transactions.py
Normal file
30
leggen/commands/transactions.py
Normal file
@@ -0,0 +1,30 @@
|
||||
import click
|
||||
|
||||
from leggen.main import cli
|
||||
from leggen.utils.network import get
|
||||
from leggen.utils.text import print_table
|
||||
|
||||
|
||||
@cli.command()
|
||||
@click.argument("account", type=str)
|
||||
@click.pass_context
|
||||
def transactions(ctx: click.Context, account: str):
|
||||
"""
|
||||
List transactions for an account
|
||||
|
||||
ACCOUNT is the account id, see 'leggen status' for the account ids
|
||||
"""
|
||||
all_transactions = []
|
||||
account_transactions = get(ctx, f"/accounts/{account}/transactions/").get(
|
||||
"transactions", []
|
||||
)
|
||||
|
||||
for transaction in account_transactions.get("booked", []):
|
||||
transaction["TYPE"] = "booked"
|
||||
all_transactions.append(transaction)
|
||||
|
||||
for transaction in account_transactions.get("pending", []):
|
||||
transaction["TYPE"] = "pending"
|
||||
all_transactions.append(transaction)
|
||||
|
||||
print_table(all_transactions)
|
||||
102
leggen/main.py
Normal file
102
leggen/main.py
Normal file
@@ -0,0 +1,102 @@
|
||||
import os
|
||||
import sys
|
||||
from gettext import gettext as _
|
||||
from pathlib import Path
|
||||
|
||||
import click
|
||||
|
||||
from leggen.utils.auth import get_token
|
||||
from leggen.utils.config import load_config
|
||||
from leggen.utils.text import error
|
||||
|
||||
cmd_folder = os.path.abspath(os.path.join(os.path.dirname(__file__), "commands"))
|
||||
|
||||
|
||||
class Group(click.Group):
|
||||
# Overriden version to support sections
|
||||
def format_commands(self, ctx, formatter):
|
||||
commands = []
|
||||
for subcommand in self.list_commands(ctx):
|
||||
if subcommand.startswith("GROUP_"):
|
||||
cmd = self.get_command(ctx, subcommand[6:])
|
||||
else:
|
||||
cmd = self.get_command(ctx, subcommand)
|
||||
# What is this, the tool lied about a command. Ignore it
|
||||
if cmd is None:
|
||||
continue
|
||||
if cmd.hidden:
|
||||
continue
|
||||
|
||||
commands.append((subcommand, cmd))
|
||||
|
||||
# allow for 3 times the default spacing
|
||||
if len(commands):
|
||||
limit = formatter.width - 6 - max(len(cmd[0]) for cmd in commands)
|
||||
|
||||
rows = []
|
||||
groups = []
|
||||
for subcommand, cmd in commands:
|
||||
help = cmd.get_short_help_str(limit)
|
||||
if subcommand.startswith("GROUP_"):
|
||||
groups.append((subcommand[6:], help))
|
||||
else:
|
||||
rows.append((subcommand, help))
|
||||
|
||||
if groups:
|
||||
with formatter.section(_("Command Groups")):
|
||||
formatter.write_dl(groups)
|
||||
|
||||
if rows:
|
||||
with formatter.section(_("Commands")):
|
||||
formatter.write_dl(rows)
|
||||
|
||||
def list_commands(self, ctx):
|
||||
commands = []
|
||||
groups = []
|
||||
for filename in os.listdir(cmd_folder):
|
||||
if filename.endswith(".py") and not filename.startswith("__init__"):
|
||||
commands.append(filename[:-3])
|
||||
if not filename.endswith(".py"):
|
||||
for group_filename in os.listdir(os.path.join(cmd_folder, filename)):
|
||||
if group_filename == "__init__.py":
|
||||
groups.append(f"GROUP_{filename}")
|
||||
|
||||
commands.sort()
|
||||
groups.sort()
|
||||
return groups + commands
|
||||
|
||||
def get_command(self, ctx, name):
|
||||
try:
|
||||
mod = __import__(f"leggen.commands.{name}", None, None, [name])
|
||||
except ImportError as e:
|
||||
error(f"Can't import command {name}. Exception: {e}")
|
||||
return
|
||||
return getattr(mod, name)
|
||||
|
||||
|
||||
@click.group(cls=Group, context_settings={"help_option_names": ["-h", "--help"]})
|
||||
@click.version_option(package_name="leggen")
|
||||
@click.pass_context
|
||||
def cli(ctx: click.Context):
|
||||
"""
|
||||
Leggen: An Open Banking CLI
|
||||
"""
|
||||
ctx.ensure_object(dict)
|
||||
|
||||
# Do not require authentication when printing help messages
|
||||
if "--help" in sys.argv[1:] or "-h" in sys.argv[1:]:
|
||||
return
|
||||
|
||||
# or when running the init command
|
||||
if ctx.invoked_subcommand == "init":
|
||||
if (click.get_app_dir("leggen") / Path("config.json")).is_file():
|
||||
click.confirm(
|
||||
"Configuration file already exists. Do you want to overwrite it?",
|
||||
abort=True,
|
||||
)
|
||||
return
|
||||
config = load_config()
|
||||
token = get_token(config)
|
||||
ctx.obj["api_url"] = config["api_url"]
|
||||
ctx.obj["mongo_uri"] = config["mongo_uri"]
|
||||
ctx.obj["headers"] = {"Authorization": f"Bearer {token}"}
|
||||
57
leggen/utils/auth.py
Normal file
57
leggen/utils/auth.py
Normal file
@@ -0,0 +1,57 @@
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
import click
|
||||
import requests
|
||||
|
||||
from leggen.utils.text import warning
|
||||
|
||||
|
||||
def create_token(config: dict) -> str:
|
||||
"""
|
||||
Create a new token
|
||||
"""
|
||||
res = requests.post(
|
||||
f"{config['api_url']}/token/new/",
|
||||
json={"secret_id": config["api_key"], "secret_key": config["api_secret"]},
|
||||
)
|
||||
res.raise_for_status()
|
||||
auth = res.json()
|
||||
save_auth(auth)
|
||||
return auth["access"]
|
||||
|
||||
|
||||
def get_token(config: dict) -> str:
|
||||
"""
|
||||
Get the token from the auth file or request a new one
|
||||
"""
|
||||
auth_file = click.get_app_dir("leggen") / Path("auth.json")
|
||||
if auth_file.exists():
|
||||
with click.open_file(str(auth_file), "r") as f:
|
||||
auth = json.load(f)
|
||||
if not auth.get("access"):
|
||||
return create_token(config)
|
||||
|
||||
res = requests.post(
|
||||
f"{config['api_url']}/token/refresh/", json={"refresh": auth["refresh"]}
|
||||
)
|
||||
try:
|
||||
res.raise_for_status()
|
||||
auth.update(res.json())
|
||||
save_auth(auth)
|
||||
return auth["access"]
|
||||
except requests.exceptions.HTTPError:
|
||||
warning(
|
||||
f"Token probably expired, requesting a new one.\nResponse: {res.status_code}\n{res.text}"
|
||||
)
|
||||
return create_token(config)
|
||||
else:
|
||||
return create_token(config)
|
||||
|
||||
|
||||
def save_auth(d: dict):
|
||||
Path.mkdir(Path(click.get_app_dir("leggen")), exist_ok=True)
|
||||
auth_file = click.get_app_dir("leggen") / Path("auth.json")
|
||||
|
||||
with click.open_file(str(auth_file), "w") as f:
|
||||
json.dump(d, f)
|
||||
29
leggen/utils/config.py
Normal file
29
leggen/utils/config.py
Normal file
@@ -0,0 +1,29 @@
|
||||
import json
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
import click
|
||||
|
||||
from leggen.utils.text import error, info
|
||||
|
||||
|
||||
def save_config(d: dict):
|
||||
Path.mkdir(Path(click.get_app_dir("leggen")), exist_ok=True)
|
||||
config_file = click.get_app_dir("leggen") / Path("config.json")
|
||||
|
||||
with click.open_file(str(config_file), "w") as f:
|
||||
json.dump(d, f)
|
||||
info(f"Wrote configuration file at '{config_file}'")
|
||||
|
||||
|
||||
def load_config() -> dict:
|
||||
config_file = click.get_app_dir("leggen") / Path("config.json")
|
||||
try:
|
||||
with click.open_file(str(config_file), "r") as f:
|
||||
config = json.load(f)
|
||||
return config
|
||||
except FileNotFoundError:
|
||||
error(
|
||||
"Configuration file not found. Run `leggen init` to configure your account."
|
||||
)
|
||||
sys.exit(1)
|
||||
35
leggen/utils/disk.py
Normal file
35
leggen/utils/disk.py
Normal file
@@ -0,0 +1,35 @@
|
||||
import json
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
import click
|
||||
|
||||
from leggen.utils.text import error, info
|
||||
|
||||
|
||||
def save_file(name: str, d: dict):
|
||||
Path.mkdir(Path(click.get_app_dir("leggen")), exist_ok=True)
|
||||
config_file = click.get_app_dir("leggen") / Path(name)
|
||||
|
||||
with click.open_file(str(config_file), "w") as f:
|
||||
json.dump(d, f)
|
||||
info(f"Wrote configuration file at '{config_file}'")
|
||||
|
||||
|
||||
def load_file(name: str) -> dict:
|
||||
config_file = click.get_app_dir("leggen") / Path(name)
|
||||
try:
|
||||
with click.open_file(str(config_file), "r") as f:
|
||||
config = json.load(f)
|
||||
return config
|
||||
except FileNotFoundError:
|
||||
error(f"Configuration file '{config_file}' not found")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def get_prefixed_files(prefix: str) -> list:
|
||||
return [
|
||||
f.name
|
||||
for f in Path(click.get_app_dir("leggen")).iterdir()
|
||||
if f.name.startswith(prefix)
|
||||
]
|
||||
10
leggen/utils/gocardless.py
Normal file
10
leggen/utils/gocardless.py
Normal file
@@ -0,0 +1,10 @@
|
||||
REQUISITION_STATUS = {
|
||||
"CR": "CREATED",
|
||||
"GC": "GIVING_CONSENT",
|
||||
"UA": "UNDERGOING_AUTHENTICATION",
|
||||
"RJ": "REJECTED",
|
||||
"SA": "SELECTING_ACCOUNTS",
|
||||
"GA": "GRANTING_ACCESS",
|
||||
"LN": "LINKED",
|
||||
"EX": "EXPIRED",
|
||||
}
|
||||
50
leggen/utils/network.py
Normal file
50
leggen/utils/network.py
Normal file
@@ -0,0 +1,50 @@
|
||||
import click
|
||||
import requests
|
||||
|
||||
from leggen.utils.text import error
|
||||
|
||||
|
||||
def get(ctx: click.Context, path: str, params: dict = {}):
|
||||
"""
|
||||
GET request to the GoCardless API
|
||||
"""
|
||||
|
||||
url = f"{ctx.obj['api_url']}{path}"
|
||||
res = requests.get(url, headers=ctx.obj["headers"], params=params)
|
||||
try:
|
||||
res.raise_for_status()
|
||||
except Exception as e:
|
||||
error(f"Error: {e}")
|
||||
ctx.abort()
|
||||
return res.json()
|
||||
|
||||
|
||||
def post(ctx: click.Context, path: str, data: dict = {}):
|
||||
"""
|
||||
POST request to the GoCardless API
|
||||
"""
|
||||
|
||||
url = f"{ctx.obj['api_url']}{path}"
|
||||
res = requests.post(url, headers=ctx.obj["headers"], json=data)
|
||||
try:
|
||||
res.raise_for_status()
|
||||
except Exception as e:
|
||||
error(f"Error: {e}")
|
||||
ctx.abort()
|
||||
return res.json()
|
||||
|
||||
|
||||
def put(ctx: click.Context, path: str, data: dict = {}):
|
||||
"""
|
||||
PUT request to the GoCardless API
|
||||
"""
|
||||
|
||||
url = f"{ctx.obj['api_url']}{path}"
|
||||
res = requests.put(url, headers=ctx.obj["headers"], json=data)
|
||||
try:
|
||||
res.raise_for_status()
|
||||
except Exception as e:
|
||||
error(f"Error: {e}")
|
||||
error(res.text)
|
||||
ctx.abort()
|
||||
return res.json()
|
||||
45
leggen/utils/text.py
Normal file
45
leggen/utils/text.py
Normal file
@@ -0,0 +1,45 @@
|
||||
import sys
|
||||
from datetime import datetime
|
||||
|
||||
import click
|
||||
import tabulate
|
||||
|
||||
|
||||
def print_table(data):
|
||||
if isinstance(data, list):
|
||||
echo(tabulate.tabulate(data, headers="keys"))
|
||||
elif isinstance(data, dict):
|
||||
echo(tabulate.tabulate([data], headers="keys"))
|
||||
else:
|
||||
error("Could not create table")
|
||||
|
||||
|
||||
def datefmt(date: str):
|
||||
return datetime.fromisoformat(date).strftime("%Y/%m/%d %H:%M")
|
||||
|
||||
|
||||
def echo(msg=""):
|
||||
click.echo(msg)
|
||||
sys.stdout.flush()
|
||||
|
||||
|
||||
def echo_error(msg, color: str, prefix="> ", bold=True, nl=True):
|
||||
padded_msg = "\n".join(f"{prefix}{line}" for line in msg.splitlines())
|
||||
click.secho(f"{padded_msg}", fg=color, err=True, color=True, bold=True)
|
||||
sys.stderr.flush()
|
||||
|
||||
|
||||
def success(msg):
|
||||
echo_error(msg, "green")
|
||||
|
||||
|
||||
def info(msg):
|
||||
echo_error(msg, "blue")
|
||||
|
||||
|
||||
def warning(msg):
|
||||
echo_error(msg, "yellow")
|
||||
|
||||
|
||||
def error(msg):
|
||||
echo_error(msg, "red")
|
||||
Reference in New Issue
Block a user