102 lines
3.3 KiB
Python

from typing import Union, Callable, Any, Awaitable, Optional
from aiogram.types import Message, CallbackQuery, Update
from aiogram import BaseMiddleware
from aiogram import exceptions as aiogram_exc
from core.models import User
from loader import custom_logger
class GetUserMiddleware(BaseMiddleware):
user: User
def __init__(self):
super(GetUserMiddleware, self).__init__()
async def __call__(
self,
handler: Callable[[Message, dict[str, Any]], Awaitable[Any]],
event: Union[Message, CallbackQuery],
data: dict[str, Any],
) -> Any:
try:
await self.on_process_event(event, data)
except CancelHandler:
return
except UsernameNotFound:
return
except aiogram_exc.TelegramBadRequest as err:
custom_logger.warning(err)
if isinstance(event, CallbackQuery):
return event.answer()
except aiogram_exc.TelegramForbiddenError as err:
custom_logger.warning(err)
data["user"] = self.user
try:
result = await handler(event, data)
return result
except aiogram_exc.TelegramBadRequest as err:
custom_logger.warning(err)
if isinstance(event, CallbackQuery):
return event.answer()
@classmethod
def get_update_users_id(cls, update: Update):
if update.callback_query:
return update.callback_query.from_user.id
elif update.message:
return update.message.from_user.id
elif update.edited_message:
return update.edited_message.from_user.id
async def set_user(self, target: Union[CallbackQuery, Message]):
telegram_id: str = str(target.from_user.id)
username: str = target.from_user.username
if not username:
if isinstance(target, CallbackQuery):
await target.message.answer(
"<b>❗️ Для корректной работы бота требуется установить @username</b>"
)
elif isinstance(target, Message):
await target.answer(
"<b>❗️ Для корректной работы бота требуется установить @username</b>"
)
raise UsernameNotFound()
self.user: User = await User.get_user_using_telegram_id(
telegram_id=telegram_id
)
if self.user is None:
if isinstance(target, CallbackQuery):
chat_id: str = target.message.chat.id
else:
chat_id: str = target.chat.id
self.user = await User.add_user(
username=username,
telegram_id=telegram_id,
chat_id=chat_id,
)
return
elif isinstance(self.user, User):
if self.user.is_active == False:
custom_logger.debug(f"User ban: {self.user.is_active}")
raise CancelHandler()
async def on_process_event(self, message: Message, _):
try:
await self.set_user(message)
except AttributeError:
pass
class CancelHandler(Exception):
pass
class UsernameNotFound(Exception):
pass