from typing import Union, Callable, Any, Awaitable, Optional from aiogram.types import Message, CallbackQuery, Update from aiogram import BaseMiddleware from aiogram import exceptions as aiogram_exc from core.models import User from loader import custom_logger class GetUserMiddleware(BaseMiddleware): user: User def __init__(self): super(GetUserMiddleware, self).__init__() async def __call__( self, handler: Callable[[Message, dict[str, Any]], Awaitable[Any]], event: Union[Message, CallbackQuery], data: dict[str, Any], ) -> Any: try: await self.on_process_event(event, data) except CancelHandler: return except UsernameNotFound: return except aiogram_exc.TelegramBadRequest as err: custom_logger.warning(err) if isinstance(event, CallbackQuery): return event.answer() except aiogram_exc.TelegramForbiddenError as err: custom_logger.warning(err) data["user"] = self.user try: result = await handler(event, data) return result except aiogram_exc.TelegramBadRequest as err: custom_logger.warning(err) if isinstance(event, CallbackQuery): return event.answer() @classmethod def get_update_users_id(cls, update: Update): if update.callback_query: return update.callback_query.from_user.id elif update.message: return update.message.from_user.id elif update.edited_message: return update.edited_message.from_user.id async def set_user(self, target: Union[CallbackQuery, Message]): telegram_id: str = str(target.from_user.id) username: str = target.from_user.username if not username: if isinstance(target, CallbackQuery): await target.message.answer( "❗️ Для корректной работы бота требуется установить @username" ) elif isinstance(target, Message): await target.answer( "❗️ Для корректной работы бота требуется установить @username" ) raise UsernameNotFound() self.user: User = await User.get_user_using_telegram_id( telegram_id=telegram_id ) if isinstance(self.user, None): if isinstance(target, CallbackQuery): chat_id: str = target.message.chat.id else: chat_id: str = target.chat.id self.user = await User.add_user( telegram_id=telegram_id, username=username, chat_id=chat_id, status=0, ) return elif isinstance(self.user, User): if self.user.is_active == False: custom_logger.debug(f"User ban: {self.user.is_active}") raise CancelHandler() async def on_process_event(self, message: Message, _): try: await self.set_user(message) except AttributeError: pass class CancelHandler(Exception): pass class UsernameNotFound(Exception): pass