mirror of
https://github.com/elisiariocouto/leggen.git
synced 2025-12-25 03:49:20 +00:00
chore: Sort imports, fix deprecated pydantic option.
This commit is contained in:
@@ -1,14 +1,14 @@
|
||||
from datetime import datetime, timedelta
|
||||
from typing import List, Dict, Any, Optional
|
||||
import sqlite3
|
||||
import json
|
||||
import sqlite3
|
||||
from collections import defaultdict
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from loguru import logger
|
||||
|
||||
from leggen.services.transaction_processor import TransactionProcessor
|
||||
from leggen.utils.config import config
|
||||
from leggen.utils.paths import path_manager
|
||||
from leggen.services.transaction_processor import TransactionProcessor
|
||||
|
||||
|
||||
class DatabaseService:
|
||||
@@ -721,8 +721,8 @@ class DatabaseService:
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""Persist transactions to SQLite"""
|
||||
try:
|
||||
import sqlite3
|
||||
import json
|
||||
import sqlite3
|
||||
|
||||
db_path = path_manager.get_database_path()
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
import json
|
||||
import httpx
|
||||
from pathlib import Path
|
||||
from typing import Dict, Any, List
|
||||
from typing import Any, Dict, List
|
||||
|
||||
import httpx
|
||||
from loguru import logger
|
||||
|
||||
from leggen.utils.config import config
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
from typing import List, Dict, Any
|
||||
from typing import Any, Dict, List
|
||||
|
||||
from loguru import logger
|
||||
|
||||
@@ -119,9 +119,10 @@ class NotificationService:
|
||||
) -> None:
|
||||
"""Send Discord notifications for transactions"""
|
||||
try:
|
||||
from leggen.notifications.discord import send_transactions_message
|
||||
import click
|
||||
|
||||
from leggen.notifications.discord import send_transactions_message
|
||||
|
||||
# Create a mock context with the webhook
|
||||
ctx = click.Context(click.Command("notifications"))
|
||||
ctx.obj = {
|
||||
@@ -148,9 +149,10 @@ class NotificationService:
|
||||
) -> None:
|
||||
"""Send Telegram notifications for transactions"""
|
||||
try:
|
||||
from leggen.notifications.telegram import send_transaction_message
|
||||
import click
|
||||
|
||||
from leggen.notifications.telegram import send_transaction_message
|
||||
|
||||
# Create a mock context with the telegram config
|
||||
ctx = click.Context(click.Command("notifications"))
|
||||
telegram_config = self.notifications_config.get("telegram", {})
|
||||
@@ -175,9 +177,10 @@ class NotificationService:
|
||||
async def _send_discord_test(self, message: str) -> None:
|
||||
"""Send Discord test notification"""
|
||||
try:
|
||||
from leggen.notifications.discord import send_expire_notification
|
||||
import click
|
||||
|
||||
from leggen.notifications.discord import send_expire_notification
|
||||
|
||||
# Create a mock context with the webhook
|
||||
ctx = click.Context(click.Command("test"))
|
||||
ctx.obj = {
|
||||
@@ -206,9 +209,10 @@ class NotificationService:
|
||||
async def _send_telegram_test(self, message: str) -> None:
|
||||
"""Send Telegram test notification"""
|
||||
try:
|
||||
from leggen.notifications.telegram import send_expire_notification
|
||||
import click
|
||||
|
||||
from leggen.notifications.telegram import send_expire_notification
|
||||
|
||||
# Create a mock context with the telegram config
|
||||
ctx = click.Context(click.Command("test"))
|
||||
telegram_config = self.notifications_config.get("telegram", {})
|
||||
@@ -237,9 +241,10 @@ class NotificationService:
|
||||
async def _send_discord_expiry(self, notification_data: Dict[str, Any]) -> None:
|
||||
"""Send Discord expiry notification"""
|
||||
try:
|
||||
from leggen.notifications.discord import send_expire_notification
|
||||
import click
|
||||
|
||||
from leggen.notifications.discord import send_expire_notification
|
||||
|
||||
# Create a mock context with the webhook
|
||||
ctx = click.Context(click.Command("expiry"))
|
||||
ctx.obj = {
|
||||
@@ -262,9 +267,10 @@ class NotificationService:
|
||||
async def _send_telegram_expiry(self, notification_data: Dict[str, Any]) -> None:
|
||||
"""Send Telegram expiry notification"""
|
||||
try:
|
||||
from leggen.notifications.telegram import send_expire_notification
|
||||
import click
|
||||
|
||||
from leggen.notifications.telegram import send_expire_notification
|
||||
|
||||
# Create a mock context with the telegram config
|
||||
ctx = click.Context(click.Command("expiry"))
|
||||
telegram_config = self.notifications_config.get("telegram", {})
|
||||
|
||||
@@ -4,8 +4,8 @@ from typing import List
|
||||
from loguru import logger
|
||||
|
||||
from leggen.api.models.sync import SyncResult, SyncStatus
|
||||
from leggen.services.gocardless_service import GoCardlessService
|
||||
from leggen.services.database_service import DatabaseService
|
||||
from leggen.services.gocardless_service import GoCardlessService
|
||||
from leggen.services.notification_service import NotificationService
|
||||
|
||||
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
from datetime import datetime
|
||||
from typing import List, Dict, Any
|
||||
from typing import Any, Dict, List
|
||||
|
||||
|
||||
class TransactionProcessor:
|
||||
|
||||
Reference in New Issue
Block a user