mirror of
https://github.com/elisiariocouto/leggen.git
synced 2025-12-14 07:22:25 +00:00
Authentication Fixes: - Implement proper async GoCardless token management in leggend service - Add automatic token refresh and creation for expired/missing tokens - Unify auth.json storage path between CLI and API (~/.config/leggen/) - Fix 401 Unauthorized errors when accessing GoCardless API Development Enhancements: - Add --reload flag to leggend for automatic file watching and restart - Add --host and --port options for flexible service binding - Include both leggend/ and leggen/ directories in reload watching - Improve development workflow with hot reloading Configuration Consistency: - Standardize config path to ~/.config/leggen/config.toml for both CLI and API - Ensure auth.json is stored in same location as main config - Add httpx dependency for async HTTP requests in leggend service Verified working: leggen status command successfully authenticates and retrieves bank/account data via leggend API service. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
160 lines
6.0 KiB
Python
160 lines
6.0 KiB
Python
import asyncio
|
|
import json
|
|
import httpx
|
|
from pathlib import Path
|
|
from typing import Dict, Any, List, Optional
|
|
|
|
from loguru import logger
|
|
|
|
from leggend.config import config
|
|
|
|
|
|
class GoCardlessService:
|
|
def __init__(self):
|
|
self.config = config.gocardless_config
|
|
self.base_url = self.config.get("url", "https://bankaccountdata.gocardless.com/api/v2")
|
|
self._token = None
|
|
|
|
async def _get_auth_headers(self) -> Dict[str, str]:
|
|
"""Get authentication headers for GoCardless API"""
|
|
token = await self._get_token()
|
|
return {
|
|
"Authorization": f"Bearer {token}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
async def _get_token(self) -> str:
|
|
"""Get access token for GoCardless API"""
|
|
if self._token:
|
|
return self._token
|
|
|
|
# Use ~/.config/leggen for consistency with main config
|
|
auth_file = Path.home() / ".config" / "leggen" / "auth.json"
|
|
|
|
if auth_file.exists():
|
|
try:
|
|
with open(auth_file, "r") as f:
|
|
auth = json.load(f)
|
|
|
|
if auth.get("access"):
|
|
# Try to refresh the token
|
|
async with httpx.AsyncClient() as client:
|
|
try:
|
|
response = await client.post(
|
|
f"{self.base_url}/token/refresh/",
|
|
json={"refresh": auth["refresh"]}
|
|
)
|
|
response.raise_for_status()
|
|
auth.update(response.json())
|
|
self._save_auth(auth)
|
|
self._token = auth["access"]
|
|
return self._token
|
|
except httpx.HTTPStatusError:
|
|
logger.warning("Token refresh failed, creating new token")
|
|
return await self._create_token()
|
|
else:
|
|
return await self._create_token()
|
|
except Exception as e:
|
|
logger.error(f"Error reading auth file: {e}")
|
|
return await self._create_token()
|
|
else:
|
|
return await self._create_token()
|
|
|
|
async def _create_token(self) -> str:
|
|
"""Create a new GoCardless access token"""
|
|
try:
|
|
async with httpx.AsyncClient() as client:
|
|
response = await client.post(
|
|
f"{self.base_url}/token/new/",
|
|
json={
|
|
"secret_id": self.config["key"],
|
|
"secret_key": self.config["secret"],
|
|
},
|
|
)
|
|
response.raise_for_status()
|
|
auth = response.json()
|
|
self._save_auth(auth)
|
|
self._token = auth["access"]
|
|
return self._token
|
|
except Exception as e:
|
|
logger.error(f"Failed to create GoCardless token: {e}")
|
|
raise
|
|
|
|
def _save_auth(self, auth_data: dict):
|
|
"""Save authentication data to file"""
|
|
auth_file = Path.home() / ".config" / "leggen" / "auth.json"
|
|
auth_file.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
with open(auth_file, "w") as f:
|
|
json.dump(auth_data, f)
|
|
|
|
async def get_institutions(self, country: str = "PT") -> List[Dict[str, Any]]:
|
|
"""Get available bank institutions for a country"""
|
|
headers = await self._get_auth_headers()
|
|
async with httpx.AsyncClient() as client:
|
|
response = await client.get(
|
|
f"{self.base_url}/institutions/",
|
|
headers=headers,
|
|
params={"country": country}
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
async def create_requisition(self, institution_id: str, redirect_url: str) -> Dict[str, Any]:
|
|
"""Create a bank connection requisition"""
|
|
headers = await self._get_auth_headers()
|
|
async with httpx.AsyncClient() as client:
|
|
response = await client.post(
|
|
f"{self.base_url}/requisitions/",
|
|
headers=headers,
|
|
json={
|
|
"institution_id": institution_id,
|
|
"redirect": redirect_url
|
|
}
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
async def get_requisitions(self) -> Dict[str, Any]:
|
|
"""Get all requisitions"""
|
|
headers = await self._get_auth_headers()
|
|
async with httpx.AsyncClient() as client:
|
|
response = await client.get(
|
|
f"{self.base_url}/requisitions/",
|
|
headers=headers
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
async def get_account_details(self, account_id: str) -> Dict[str, Any]:
|
|
"""Get account details"""
|
|
headers = await self._get_auth_headers()
|
|
async with httpx.AsyncClient() as client:
|
|
response = await client.get(
|
|
f"{self.base_url}/accounts/{account_id}/",
|
|
headers=headers
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
async def get_account_balances(self, account_id: str) -> Dict[str, Any]:
|
|
"""Get account balances"""
|
|
headers = await self._get_auth_headers()
|
|
async with httpx.AsyncClient() as client:
|
|
response = await client.get(
|
|
f"{self.base_url}/accounts/{account_id}/balances/",
|
|
headers=headers
|
|
)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
|
|
async def get_account_transactions(self, account_id: str) -> Dict[str, Any]:
|
|
"""Get account transactions"""
|
|
headers = await self._get_auth_headers()
|
|
async with httpx.AsyncClient() as client:
|
|
response = await client.get(
|
|
f"{self.base_url}/accounts/{account_id}/transactions/",
|
|
headers=headers
|
|
)
|
|
response.raise_for_status()
|
|
return response.json() |