Files
leggen/leggen/notifications/discord.py
2025-12-07 12:21:46 +00:00

95 lines
3.1 KiB
Python

import click
from discord_webhook import DiscordEmbed, DiscordWebhook
from leggen.utils.text import info
def send_expire_notification(ctx: click.Context, notification: dict):
info("Sending expiration notification to Discord")
webhook = DiscordWebhook(url=ctx.obj["notifications"]["discord"]["webhook"])
embed = DiscordEmbed(
title="",
description=f"Your account {notification['bank']} ({notification['requisition_id']}) is in {notification['status']} status. Days left: {notification['days_left']}",
color="03b2f8",
)
embed.set_author(
name="Leggen",
url="https://github.com/elisiariocouto/leggen",
)
embed.set_footer(text="Expiration notice")
embed.set_timestamp()
webhook.add_embed(embed)
response = webhook.execute()
try:
response.raise_for_status()
except Exception as e:
raise Exception(f"Discord notification failed: {e}\n{response.text}") from e
def send_transactions_message(ctx: click.Context, transactions: list):
info(f"Got {len(transactions)} new transactions, sending message to Discord")
webhook = DiscordWebhook(url=ctx.obj["notifications"]["discord"]["webhook"])
embed = DiscordEmbed(
title="",
description=f"{len(transactions)} new transaction matches",
color="03b2f8",
)
embed.set_author(
name="Leggen",
url="https://github.com/elisiariocouto/leggen",
)
embed.set_footer(text="Case-insensitive filters")
embed.set_timestamp()
for transaction in transactions:
embed.add_embed_field(
name=transaction["name"],
value=f"{transaction['value']}{transaction['currency']} ({transaction['date']})",
)
webhook.add_embed(embed)
response = webhook.execute()
try:
response.raise_for_status()
except Exception as e:
raise Exception(f"Discord notification failed: {e}\n{response.text}") from e
def send_sync_failure_notification(ctx: click.Context, notification: dict):
info("Sending sync failure notification to Discord")
webhook = DiscordWebhook(url=ctx.obj["notifications"]["discord"]["webhook"])
color = "ffaa00" # Orange for sync failure
title = "⚠️ Sync Failure"
# Build description with account info if available
description = "Account sync failed"
if notification.get("account_id"):
description = f"Account {notification['account_id']} sync failed"
embed = DiscordEmbed(
title=title,
description=description,
color=color,
)
embed.set_author(
name="Leggen",
url="https://github.com/elisiariocouto/leggen",
)
embed.add_embed_field(
name="Error",
value=notification["error"][:1024], # Discord has field value limits
inline=False,
)
embed.set_footer(text="Sync failure notification")
embed.set_timestamp()
webhook.add_embed(embed)
response = webhook.execute()
try:
response.raise_for_status()
except Exception as e:
raise Exception(f"Discord notification failed: {e}\n{response.text}") from e