import datetime from typing import Optional from sqlalchemy.future import select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import declarative_base from sqlalchemy import ( func, Column, DateTime, Integer, String, Boolean, ) from sqlalchemy import exc as sqlalchemy_exc from app.core.sessions import get_session from loguru import logger Base = declarative_base() class BaseDBModel(Base): __abstract__ = True id = Column( Integer, nullable=False, unique=True, primary_key=True, autoincrement=True, ) def __repr__(self): return "<{0.__class__.__name__}(id={0.id!r})>".format(self) class BaseCreatedUpdatedAtModel: created_at = Column(DateTime, default=datetime.datetime.now(datetime.timezone.utc)) updated_at = Column( DateTime, default=func.now(), onupdate=func.current_timestamp(), ) class User(BaseDBModel, BaseCreatedUpdatedAtModel): __tablename__: str = "users" username: str = Column(String, nullable=False) telegram_id: str = Column(String, nullable=False) chat_id: str = Column(String, nullable=False) is_active: bool = Column(Boolean, nullable=False, default=True) is_admin: bool = Column(Boolean, nullable=False, default=False) async def add_user( username: str, telegram_id: str | int, chat_id: str | int, is_active: bool = True, is_admin: bool = False ) -> Optional["User"]: db_session: AsyncSession = await get_session() async with db_session as session: try: new_user = User( username=username, telegram_id=str(telegram_id), chat_id=str(chat_id), is_active=is_active, is_admin=is_admin, ) session.add(new_user) await session.commit() return new_user except sqlalchemy_exc.SQLAlchemyError as err: logger.warning(err) await session.rollback() finally: await session.close() async def get_user_using_user_id(user_id: str | int) -> Optional["User"]: db_session: AsyncSession = await get_session() if isinstance(user_id, str): user_id: int = int(user_id) async with db_session as session: try: stmt = select(User).where(User.id == user_id) user: Optional[User] = ( (await session.execute(stmt)).unique().scalar_one_or_none() ) return user except sqlalchemy_exc.SQLAlchemyError as err: logger.warning(err) finally: await session.close() async def get_user_using_telegram_id(telegram_id: str | int) -> Optional["User"]: db_session: AsyncSession = await get_session() if isinstance(telegram_id, int): telegram_id: str = str(telegram_id) async with db_session as session: try: stmt = select(User).filter(User.telegram_id == telegram_id) user: Optional[User] = ( (await session.execute(stmt)).unique().scalar_one_or_none() ) return user except sqlalchemy_exc.SQLAlchemyError as err: logger.warning(err) finally: await session.close()