2025-03-29 18:18:14 +03:00

120 lines
3.7 KiB
Python

from typing import Optional
from pathlib import Path
import datetime
import sys
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import exc as sqlalchemy_exc
from sqlalchemy.types import TIMESTAMP
from sqlalchemy.orm import declarative_base
from sqlalchemy.future import select
from sqlalchemy import (
func,
Column,
DateTime,
Integer,
String,
Boolean,
)
sys.path.append(str(Path(__file__).resolve().parent.parent))
from core.sessions import get_session
from loader import custom_logger
Base = declarative_base()
class BaseDBModel(Base):
__abstract__ = True
id = Column(
Integer,
nullable=False,
unique=True,
primary_key=True,
autoincrement=True,
)
def __repr__(self):
return "<{0.__class__.__name__}(id={0.id!r})>".format(self)
class BaseCreatedUpdatedAtModel:
created_at = Column(type_=TIMESTAMP(timezone=True), default=datetime.datetime.now(datetime.timezone.utc))
updated_at = Column(
DateTime,
default=func.now(),
onupdate=func.current_timestamp(),
)
class User(BaseDBModel, BaseCreatedUpdatedAtModel):
__tablename__: str = "users"
username: str = Column(String, nullable=False)
telegram_id: str = Column(String, nullable=False)
chat_id: str = Column(String, nullable=False)
is_active: bool = Column(Boolean, nullable=False, default=True)
is_admin: bool = Column(Boolean, nullable=False, default=False)
async def add_user(
username: str,
telegram_id: str | int,
chat_id: str | int,
is_active: bool = True,
is_admin: bool = False
) -> Optional["User"]:
db_session: AsyncSession = await get_session()
async with db_session as session:
try:
new_user = User(
username=username,
telegram_id=str(telegram_id),
chat_id=str(chat_id),
is_active=is_active,
is_admin=is_admin,
)
session.add(new_user)
await session.commit()
return new_user
except sqlalchemy_exc.SQLAlchemyError as err:
custom_logger.warning(err)
await session.rollback()
finally:
await session.close()
async def get_user_using_user_id(user_id: str | int) -> Optional["User"]:
db_session: AsyncSession = await get_session()
if isinstance(user_id, str):
user_id: int = int(user_id)
async with db_session as session:
try:
stmt = select(User).where(User.id == user_id)
user: Optional[User] = (
(await session.execute(stmt)).unique().scalar_one_or_none()
)
return user
except sqlalchemy_exc.SQLAlchemyError as err:
custom_logger.warning(err)
finally:
await session.close()
async def get_user_using_telegram_id(telegram_id: str | int) -> Optional["User"]:
db_session: AsyncSession = await get_session()
if isinstance(telegram_id, int):
telegram_id: str = str(telegram_id)
async with db_session as session:
try:
stmt = select(User).filter(User.telegram_id == telegram_id)
user: Optional[User] = (
(await session.execute(stmt)).unique().scalar_one_or_none()
)
return user
except sqlalchemy_exc.SQLAlchemyError as err:
custom_logger.warning(err)
finally:
await session.close()