Cleanup types

This commit is contained in:
2025-08-17 07:30:34 +01:00
parent 70282e3596
commit 8ba995bc5f
3 changed files with 57 additions and 82 deletions

View File

@@ -12,9 +12,7 @@ from telegram.ext import (
from smsbot.utils import get_smsbot_version from smsbot.utils import get_smsbot_version
REQUEST_TIME = Summary( REQUEST_TIME = Summary("telegram_request_processing_seconds", "Time spent processing request")
"telegram_request_processing_seconds", "Time spent processing request"
)
COMMAND_COUNT = Counter("telegram_command_count", "Total number of commands processed") COMMAND_COUNT = Counter("telegram_command_count", "Total number of commands processed")
@@ -35,21 +33,18 @@ class TelegramSmsBot:
async def callback(self, update: Update, context: ContextTypes.DEFAULT_TYPE): async def callback(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Handle the update""" """Handle the update"""
if update.effective_user.id in self.owners: if update.effective_user and update.message:
self.logger.info( if update.effective_user.id in self.owners:
f"{update.effective_user.username} sent {update.message.text}" self.logger.info(f"{update.effective_user.username} sent {update.message.text}")
) COMMAND_COUNT.inc()
COMMAND_COUNT.inc() else:
else: self.logger.debug(f"Ignoring message from user {update.effective_user.username}")
self.logger.debug(f"Ignoring message from user {update.effective_user.id}") raise ApplicationHandlerStop
raise ApplicationHandlerStop
async def send_message(self, chat_id: int, text: str): async def send_message(self, chat_id: int, text: str):
"""Send a message to a specific chat""" """Send a message to a specific chat"""
self.logger.info(f"Sending message to chat {chat_id}: {text}") self.logger.info(f"Sending message to chat {chat_id}: {text}")
await self.app.bot.send_message( await self.app.bot.send_message(chat_id=chat_id, text=text, parse_mode="MarkdownV2")
chat_id=chat_id, text=text, parse_mode="MarkdownV2"
)
async def send_subscribers(self, text: str): async def send_subscribers(self, text: str):
"""Send a message to all subscribers""" """Send a message to all subscribers"""
@@ -64,48 +59,41 @@ class TelegramSmsBot:
await self.send_message(owner, text) await self.send_message(owner, text)
@REQUEST_TIME.time() @REQUEST_TIME.time()
async def handler_help(self, update, context): async def handler_help(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Send a message when the command /help is issued.""" """Send a message when the command /help is issued."""
self.logger.info("/help command received in chat: %s", update.message.chat) if update.message:
self.logger.info("/help command received in chat: %s", update.message.chat)
commands = [] commands = []
for command in self.app.handlers[0]: for command in self.app.handlers[0]:
if isinstance(command, CommandHandler): if isinstance(command, CommandHandler):
commands.extend(["/{0}".format(cmd) for cmd in command.commands]) commands.extend(["/{0}".format(cmd) for cmd in command.commands])
await update.message.reply_markdown( await update.message.reply_markdown("Smsbot v{0}\n\n{1}".format(get_smsbot_version(), "\n".join(commands)))
"Smsbot v{0}\n\n{1}".format(get_smsbot_version(), "\n".join(commands)) COMMAND_COUNT.inc()
)
COMMAND_COUNT.inc()
@REQUEST_TIME.time() @REQUEST_TIME.time()
async def handler_subscribe( async def handler_subscribe(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
self, update: Update, context: ContextTypes.DEFAULT_TYPE
):
"""Handle subscription requests""" """Handle subscription requests"""
user_id = update.effective_user.id if update.effective_user and update.message:
if user_id not in self.subscribers: user_id = update.effective_user.id
self.subscribers.append(user_id) if user_id not in self.subscribers:
self.logger.info(f"User {user_id} subscribed.") self.subscribers.append(user_id)
self.logger.info(f"Current subscribers: {self.subscribers}") self.logger.info(f"User {user_id} subscribed.")
await update.message.reply_markdown( self.logger.info(f"Current subscribers: {self.subscribers}")
"You have successfully subscribed to updates." await update.message.reply_markdown("You have successfully subscribed to updates.")
) else:
else: self.logger.info(f"User {user_id} is already subscribed.")
self.logger.info(f"User {user_id} is already subscribed.")
@REQUEST_TIME.time() @REQUEST_TIME.time()
async def handler_unsubscribe( async def handler_unsubscribe(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
self, update: Update, context: ContextTypes.DEFAULT_TYPE
):
"""Handle unsubscription requests""" """Handle unsubscription requests"""
user_id = update.effective_user.id if update.effective_user and update.message:
if user_id in self.subscribers: user_id = update.effective_user.id
self.subscribers.remove(user_id) if user_id in self.subscribers:
self.logger.info(f"User {user_id} unsubscribed.") self.subscribers.remove(user_id)
self.logger.info(f"Current subscribers: {self.subscribers}") self.logger.info(f"User {user_id} unsubscribed.")
await update.message.reply_markdown( self.logger.info(f"Current subscribers: {self.subscribers}")
"You have successfully unsubscribed from updates." await update.message.reply_markdown("You have successfully unsubscribed from updates.")
) else:
else: self.logger.info(f"User {user_id} is not subscribed.")
self.logger.info(f"User {user_id} is not subscribed.")

View File

@@ -1,13 +1,13 @@
from importlib.metadata import version from importlib.metadata import version
def get_smsbot_version(): def get_smsbot_version() -> str:
return version("smsbot") return version("smsbot")
class TwilioWebhookPayload: class TwilioWebhookPayload:
@staticmethod @staticmethod
def parse(data: dict): def parse(data: dict[str, str]) -> "TwilioCall | TwilioMessage | None":
"""Return the correct class for the incoming Twilio webhook payload""" """Return the correct class for the incoming Twilio webhook payload"""
if "SmsMessageSid" in data: if "SmsMessageSid" in data:
return TwilioMessage(data) return TwilioMessage(data)
@@ -62,11 +62,7 @@ class TwilioMessage(TwilioWebhookPayload):
return msg return msg
def to_markdownv2(self): def to_markdownv2(self):
media_str = ( media_str = "\n".join([f"{self._escape(url)}" for url in self.media]) if self.media else ""
"\n".join([f"{self._escape(url)}" for url in self.media])
if self.media
else ""
)
msg = f"**From**: {self._escape(self.from_number)}\n**To**: {self._escape(self.to_number)}\n\n{self._escape(self.body)}\n\n{media_str}" msg = f"**From**: {self._escape(self.from_number)}\n**To**: {self._escape(self.to_number)}\n\n{self._escape(self.body)}\n\n{media_str}"
return msg return msg
@@ -85,6 +81,6 @@ class TwilioCall(TwilioWebhookPayload):
msg = f"Call from {self.from_number}, rejected." msg = f"Call from {self.from_number}, rejected."
return msg return msg
def to_markdownv2(self): def to_markdownv2(self) -> str:
msg = f"Call from {self._escape(self.from_number)}, rejected\\." msg = f"Call from {self._escape(self.from_number)}, rejected\\."
return msg return msg

View File

@@ -8,18 +8,16 @@ from werkzeug.middleware.dispatcher import DispatcherMiddleware
from smsbot.utils import TwilioWebhookPayload, get_smsbot_version from smsbot.utils import TwilioWebhookPayload, get_smsbot_version
REQUEST_TIME = Summary( REQUEST_TIME = Summary("webhook_request_processing_seconds", "Time spent processing request")
"webhook_request_processing_seconds", "Time spent processing request"
)
MESSAGE_COUNT = Counter("webhook_message_count", "Total number of messages processed") MESSAGE_COUNT = Counter("webhook_message_count", "Total number of messages processed")
CALL_COUNT = Counter("webhook_call_count", "Total number of calls processed") CALL_COUNT = Counter("webhook_call_count", "Total number of calls processed")
class TwilioWebhookHandler(object): class TwilioWebhookHandler(object):
""" """
A wrapped Flask app handling webhooks received from Twilio A wrapped Flask app handling webhooks received from Twilio
""" """
def __init__(self, account_sid: str | None = None, auth_token: str | None = None): def __init__(self, account_sid: str | None = None, auth_token: str | None = None):
self.app = Flask(self.__class__.__name__) self.app = Flask(self.__class__.__name__)
self.app.add_url_rule("/", "index", self.index, methods=["GET"]) self.app.add_url_rule("/", "index", self.index, methods=["GET"])
@@ -50,9 +48,7 @@ class TwilioWebhookHandler(object):
async def decorated_function(*args, **kwargs): async def decorated_function(*args, **kwargs):
# Create an instance of the RequestValidator class # Create an instance of the RequestValidator class
if not self.auth_token: if not self.auth_token:
current_app.logger.warning( current_app.logger.warning("Twilio request validation skipped due to Twilio Auth Token missing")
"Twilio request validation skipped due to Twilio Auth Token missing"
)
return await func(*args, **kwargs) return await func(*args, **kwargs)
validator = RequestValidator(self.auth_token) validator = RequestValidator(self.auth_token)
@@ -75,10 +71,10 @@ class TwilioWebhookHandler(object):
def set_bot(self, bot): def set_bot(self, bot):
self.bot = bot self.bot = bot
async def index(self): async def index(self) -> str:
return f'smsbot v{get_smsbot_version()} - <a href="https://github.com/nikdoof/smsbot">GitHub</a>' return f'smsbot v{get_smsbot_version()} - <a href="https://github.com/nikdoof/smsbot">GitHub</a>'
async def health(self): async def health(self) -> dict[str, str | int]:
"""Return basic health information""" """Return basic health information"""
return { return {
"version": get_smsbot_version(), "version": get_smsbot_version(),
@@ -87,29 +83,24 @@ class TwilioWebhookHandler(object):
} }
@time(REQUEST_TIME) @time(REQUEST_TIME)
async def message(self): async def message(self) -> str:
"""Handle incoming SMS messages from Twilio""" """Handle incoming SMS messages from Twilio"""
current_app.logger.info( current_app.logger.info("Received SMS from {From}: {Body}".format(**request.values.to_dict()))
"Received SMS from {From}: {Body}".format(**request.values.to_dict()) hook_data = TwilioWebhookPayload.parse(request.values.to_dict())
) if hook_data:
await self.bot.send_subscribers(hook_data.to_markdownv2())
await self.bot.send_subscribers(
TwilioWebhookPayload.parse(request.values.to_dict()).to_markdownv2()
)
# Return a blank response # Return a blank response
MESSAGE_COUNT.inc() MESSAGE_COUNT.inc()
return '<?xml version="1.0" encoding="UTF-8"?><Response></Response>' return '<?xml version="1.0" encoding="UTF-8"?><Response></Response>'
@time(REQUEST_TIME) @time(REQUEST_TIME)
async def call(self): async def call(self) -> str:
"""Handle incoming calls from Twilio""" """Handle incoming calls from Twilio"""
current_app.logger.info( current_app.logger.info("Received Call from {From}".format(**request.values.to_dict()))
"Received Call from {From}".format(**request.values.to_dict()) hook_data = TwilioWebhookPayload.parse(request.values.to_dict())
) if hook_data:
await self.bot.send_subscribers( await self.bot.send_subscribers(hook_data.to_markdownv2())
TwilioWebhookPayload.parse(request.values.to_dict()).to_markdownv2()
)
# Always reject calls # Always reject calls
CALL_COUNT.inc() CALL_COUNT.inc()